diff --git a/experiment.py b/experiment.py index a3c3b41..602078b 100644 --- a/experiment.py +++ b/experiment.py @@ -40,11 +40,12 @@ async def count(stop, step=1): async def main(): print("Spawning countdown immediately, scheduling count for 2 secs from now") task = loop.spawn(countdown(8)) - task1 = loop.schedule(count(8, 2), 2) - await giambio.sleep(2) # Wait before cancelling -# await task.cancel() # Cancel the task + task1 = loop.spawn(count(8, 2)) + await giambio.sleep(2) + await task.cancel() result = await task1.join() # Joining multiple tasks still causes problems -# result1 = await task.join() + result1 = await task.join() + print(result, result1) print("All done") loop.start(main) diff --git a/giambio/.core.cover.swp b/giambio/.core.cover.swp new file mode 100644 index 0000000..4b84621 Binary files /dev/null and b/giambio/.core.cover.swp differ diff --git a/giambio/core.py b/giambio/core.py index 0e99057..5fe168f 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -49,16 +49,12 @@ class EventLoop: method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) except StopIteration as e: # TODO: Fix this return mechanism, it looks like the return value of the child task gets "replaced" by None at some point - self.running.ret_value = e.args[0] if e.args else None # Saves the return value + self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task - except RuntimeError: - self.running.cancelled = True - self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task - print(self.to_run) except Exception as has_raised: self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task if self.running.joined: # Let the join function handle the hassle of propagating the error - self.running.exception = has_raised # Save the exception + self.running.result = Result(None, has_raised) # Save the exception else: # Let the exception propagate (I'm looking at you asyncIO ;)) raise except KeyboardInterrupt: @@ -67,11 +63,11 @@ class EventLoop: def spawn(self, coroutine: types.coroutine): """Schedules a task for execution, appending it to the call stack""" - task = Task(coroutine) + task = Task(coroutine, self) self.to_run.append(task) return task - def schedule(self, coroutine: types.coroutine, when: int): + def schedule(self, coroutine: types.coroutine, when: int): # TODO: Fix this """Schedules a task for execution after n seconds""" self.sequence += 1 @@ -131,31 +127,50 @@ class EventLoop: heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) def want_cancel(self, task): - task.coroutine.throw(CancelledError) + self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task + task.cancelled = True + task.throw(CancelledError()) + async def connect_sock(self, sock: socket.socket, addr: tuple): await want_write(sock) return sock.connect(addr) +class Result: + """A wrapper for results of coroutines""" + + def __init__(self, val=None, exc: Exception = None): + self.val = val + self.exc = exc + + def __repr__(self): + return f"giambio.core.Result({self.val}, {self.exc})" + + class Task: """A simple wrapper around a coroutine object""" - def __init__(self, coroutine: types.coroutine): + def __init__(self, coroutine: types.coroutine, loop: EventLoop): self.coroutine = coroutine self.status = False # Not ran yet self.joined = False - self.ret_val = None # Return value is saved here - self.exception = None # If errored, the exception is saved here - self.cancelled = False # When cancelled, this is True + self.result = None # Updated when the coroutine execution ends + self.loop = loop # The EventLoop object that spawned the task def run(self): self.status = True - return self.coroutine.send(None) + try: + return self.coroutine.send(None) + except RuntimeError: + print(self.loop.to_run) def __repr__(self): - return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.ret_val}, {self.exception}, {self.cancelled})" + return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})" + + def throw(self, exception: Exception): + return self.coroutine.throw(exception) async def cancel(self): return await cancel(self) @@ -163,6 +178,13 @@ class Task: async def join(self): return await join(self) + def get_result(self): + if self.result: + if self.result.exc: + raise self.result.exc + else: + return self.result.val + @types.coroutine def sleep(seconds: int): @@ -192,20 +214,9 @@ def join(task: Task): task.joined = True yield "want_join", task - if task.exception: - print("Traceback (most recent call last):") - traceback.print_tb(task.exception.__traceback__) - exception_name = type(task.exception).__name__ - if str(task.exception): - print(f"{exception_name}: {task.exception}") - else: - print(task.exception) - raise task.exception - return task.ret_val + return task.get_result() @types.coroutine def cancel(task: Task): yield "want_cancel", task - assert task.cancelled - diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 58b28d7..fc74c6e 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -9,3 +9,6 @@ class AlreadyJoinedError(GiambioError): class CancelledError(GiambioError): """Exception raised as a result of the giambio.core.cancel() method""" + + def __repr__(self): + return "giambio.exceptions.CancelledError"