diff --git a/experiment.py b/experiment.py index def0b8c..92aeeb7 100644 --- a/experiment.py +++ b/experiment.py @@ -29,6 +29,7 @@ async def countdown(n): except giambio.CancelledError: print("countdown cancelled!") + async def count(stop, step=1): try: x = 0 @@ -41,21 +42,15 @@ async def count(stop, step=1): except giambio.CancelledError: print("count cancelled!") + async def main(): print("Spawning countdown immediately, scheduling count for 4 secs from now") - task = loop.spawn(countdown(8)) - task1 = loop.schedule(count(6, 2), 4) # Schedules the task, it will be ran 4 seconds from now - await giambio.sleep(2) # TODO: Fix this to avoid the need to use a checkpoint before cancelling -# await task.cancel() - result = await task.join() # Would raise TaskCancelled if the task was cancelled - result = 'Task cancelled' if task.cancelled else task.result.val - result1 = await task1.join() - print(f"countdown returned: {result}\ncount returned: {result1}") - print("All done") - print("PT. 2 Context Manager") async with giambio.TaskManager(loop) as manager: - task2 = await manager.spawn(countdown(8)) - task3 = await manager.schedule(count(16, 2), 4) - print(manager.values) + task = await manager.spawn(countdown(4)) + await manager.schedule(count(8, 2), 4) + await task.cancel() + for task, ret in manager.values.items(): + print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}") + loop.start(main) diff --git a/giambio/__init__.py b/giambio/__init__.py index 90235a6..2ba9e1c 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,8 +1,8 @@ __author__ = "Nocturn9x aka Isgiambyy" __version__ = (0, 0, 1) from .core import EventLoop -from .exceptions import GiambioError, AlreadyJoinedError, CancelledError, TaskCancelled -from .traps import sleep, join +from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from .util import TaskManager +from .traps import _sleep as sleep -__all__ = ["EventLoop", "sleep", "join", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "TaskCancelled"] +__all__ = ["EventLoop", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"] diff --git a/giambio/abstractions.py b/giambio/abstractions.py index 465dc2d..7ee1a96 100644 --- a/giambio/abstractions.py +++ b/giambio/abstractions.py @@ -1,6 +1,6 @@ import types -from .traps import join, sleep, want_read, want_write, cancel -from .exceptions import CancelledError +from .traps import _join, _cancel, _sleep + class Result: """A wrapper for results of coroutines""" @@ -19,15 +19,17 @@ class Task: def __init__(self, coroutine: types.coroutine, loop): self.coroutine = coroutine - self.status = False # Not ran yet - self.joined = False + self.status = False # Not ran yet + self.joined = False # True if the task is joined self.result = None # Updated when the coroutine execution ends self.loop = loop # The EventLoop object that spawned the task - self.cancelled = False - self.execution = "INIT" - self.steps = 0 # How many steps did the task do before ending, incremented while executing + self.cancelled = False # True if the task gets cancelled + self.execution = "INIT" # Is set to 'FINISH' when the task ends + self.steps = 0 # How many steps did the task run before ending, incremented while executing def run(self): + """Simple abstraction layer over the coroutines ``send`` method""" + self.status = True return self.coroutine.send(None) @@ -35,20 +37,28 @@ class Task: return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})" def throw(self, exception: Exception): + """Simple abstraction layer over the coroutines ``throw`` method""" + self.result = Result(None, exception) return self.coroutine.throw(exception) async def cancel(self): - return await cancel(self) + """Cancels the task, throwing inside it a ``giambio.exceptions.CancelledError`` exception + and discarding whatever the function could return""" + + await _sleep(0) # Switch tasks (_sleep with 0 as delay merely acts as a checkpoint) or everything breaks: is it a good solution? + return await _cancel(self) async def join(self, silent=False): - return await join(self, silent) + return await _join(self, silent) - def get_result(self): + def get_result(self, silenced=False): if self.result: - if self.result.exc: - raise self.result.exc - else: - return self.result.val + if not silenced: + if self.result.exc: + raise self.result.exc + else: + return self.result.val + return self.result.val if self.result.val else self.result.exc diff --git a/giambio/core.py b/giambio/core.py index 97088ed..aacf1ca 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -9,7 +9,7 @@ from time import sleep as wait from .socket import AsyncSocket, WantRead, WantWrite from .abstractions import Task, Result from socket import SOL_SOCKET, SO_ERROR -from .traps import join, sleep, want_read, want_write, cancel +from .traps import _join, _sleep, _want_read, _want_write, _cancel class EventLoop: @@ -66,25 +66,11 @@ class EventLoop: except KeyboardInterrupt: self.running.throw(KeyboardInterrupt) - def spawn(self, coroutine: types.coroutine): - """Schedules a task for execution, appending it to the call stack""" - - task = Task(coroutine, self) - self.to_run.append(task) - return task - - def schedule(self, coroutine: types.coroutine, when: int): - """Schedules a task for execution after n seconds""" - - self.sequence += 1 - task = Task(coroutine, self) - heappush(self.paused, (self.clock() + when, self.sequence, task)) - return task def start(self, coroutine: types.coroutine, *args, **kwargs): """Starts the event loop""" - self.spawn(coroutine(*args, **kwargs)) + self.to_run.append(coroutine(*args, **kwargs)) self.loop() def want_read(self, sock: socket.socket): @@ -107,7 +93,7 @@ class EventLoop: from the socket """ - await want_read(sock) + await _want_read(sock) return sock.recv(buffer) async def accept_sock(self, sock: socket.socket): @@ -115,21 +101,21 @@ class EventLoop: result of the accept() call """ - await want_read(sock) + await _want_read(sock) return sock.accept() async def sock_sendall(self, sock: socket.socket, data: bytes): """Sends all the passed data, as bytes, trough the socket asynchronously""" while data: - await want_write(sock) + await _want_write(sock) sent_no = sock.send(data) data = data[sent_no:] async def close_sock(self, sock: socket.socket): """Closes the socket asynchronously""" - await want_write(sock) + await _want_write(sock) return sock.close() def want_join(self, coro: types.coroutine): @@ -161,7 +147,7 @@ class EventLoop: result = sock.connect(addr) return result except WantWrite: - await want_write(sock) + await _want_write(sock) err = sock.getsockopt(SOL_SOCKET, SO_ERROR) if err != 0: raise OSError(err, f'Connect call failed {addr}') diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 6bd2fcd..dd6ca5c 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -7,14 +7,8 @@ class AlreadyJoinedError(GiambioError): pass -class CancelledError(GiambioError): +class CancelledError(BaseException): """Exception raised as a result of the giambio.core.cancel() method""" def __repr__(self): return "giambio.exceptions.CancelledError" - -class TaskCancelled(GiambioError): - """This exception is raised when the user attempts to join a cancelled task""" - -class TaskFinished(TaskCancelled): - """This exception is raised when the user attempts to join an already finished task""" diff --git a/giambio/traps.py b/giambio/traps.py index 4064f1e..6f4fc5c 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -2,10 +2,10 @@ import types import socket -from .exceptions import TaskCancelled + @types.coroutine -def sleep(seconds: int): +def _sleep(seconds: int): """Pause the execution of a coroutine for the passed amount of seconds, without blocking the entire event loop, which keeps watching for other events @@ -14,55 +14,63 @@ def sleep(seconds: int): enough calls to async methods (or 'checkpoints'), e.g one that needs the 'await' keyword before it, this might affect performance as it would prevent the scheduler from switching tasks properly. If you feel like this happens in your code, try adding a call to giambio.sleep(0); this will act as a checkpoint without - actually pausing the execution of your coroutine""" + actually pausing the execution of your coroutine + + :param seconds: The amount of seconds to sleep for + :type seconds: int + """ yield "want_sleep", seconds @types.coroutine -def want_read(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to read from the passed socket""" +def _want_read(sock: socket.socket): + """'Tells' the event loop that there is some coroutine that wants to read from the passed socket + + :param sock: The socket to perform the operation on + :type sock: class: socket.socket + """ yield "want_read", sock @types.coroutine -def want_write(sock: socket.socket): - """'Tells' the event loop that there is some coroutine that wants to write into the passed socket""" +def _want_write(sock: socket.socket): + """'Tells' the event loop that there is some coroutine that wants to write into the passed socket + + :param sock: The socket to perform the operation on + :type sock: class: socket.socket + """ yield "want_write", sock @types.coroutine -def join(task, silent=False): +def _join(task, silent=False): """'Tells' the scheduler that the desired task MUST be awaited for completion - If silent is True, any exception in the child task will be discarded""" + If silent is True, any exception in the child task will be discarded + + :param task: The task to join + :type task: class: Task + :param silent: If ``True``, any exception raised from the child will be ignored (not recommended), defaults to ``False`` + :type silent: bool, optional + """ - if task.cancelled: - raise TaskCancelled("cannot join cancelled task!") task.joined = True yield "want_join", task - if silent: - try: - return task.get_result() # Exception silenced - except: - return None - else: - return task.get_result() # Will raise + return task.get_result(silent) + @types.coroutine -def cancel(task): - """'Tells' the scheduler that the passed task must be cancelled""" +def _cancel(task): + """'Tells' the scheduler that the passed task must be cancelled + + The concept of cancellation here is tricky, because there is no real way to 'stop' a + running task if not by raising an exception inside it and just ignore whatever the task + returns (and also hoping that the task won't cause damage when exiting abruptly). + It is highly recommended that when you write a coroutine you take into account that it might + be cancelled at any time + """ yield "want_cancel", task -@types.coroutine -def join_unfinished(task): - """Same as join(), but it will raise a TaskFinished exception if the task already ended""" - - if task.execution == "FINISH": - raise TaskFinished("task has already ended!") - yield "want_join", task - task.joined = True - return task.get_result() - diff --git a/giambio/util.py b/giambio/util.py index ec212fb..faa7e38 100644 --- a/giambio/util.py +++ b/giambio/util.py @@ -1,30 +1,41 @@ -from .abstractions import Task from collections import deque +from .exceptions import CancelledError +import types +from .abstractions import Task +from heapq import heappush -class TaskManager(Task): +class TaskManager: """Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block""" def __init__(self, loop, silent=False): self.tasks = deque() # All tasks spawned self.values = {} # Results OR exceptions of each task - self.loop = loop - self.silent = silent + self.loop = loop # The event loop that spawned the TaskManager + self.silent = silent # Make exceptions silent? (not recommended) async def __aenter__(self): return self async def __aexit__(self, *args): for task in self.tasks: - self.values[task.coroutine.__name__] = await task.join(self.silent) + if task.cancelled: + self.values[task] = CancelledError() + else: + self.values[task] = await task.join(self.silent) - async def spawn(self, coro): - task = self.loop.spawn(coro) - self.tasks.append(task) + def spawn(self, coroutine: types.coroutine): + """Schedules a task for execution, appending it to the call stack""" + + task = Task(coroutine, self) + self.loop.to_run.append(task) return task - async def schedule(self, coro, delay): - task = self.loop.schedule(coro, delay) - self.tasks.append(task) + def schedule(self, coroutine: types.coroutine, when: int): + """Schedules a task for execution after n seconds""" + + self.loop.sequence += 1 + task = Task(coroutine, self) + heappush(self.loop.paused, (self.loop.clock() + when, self.loop.sequence, task)) return task