From f9eb329ec9717758d6a781398c0c82117b0d7885 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Wed, 25 Mar 2020 22:36:17 +0000 Subject: [PATCH] Many fixes, many TODOs added --- experiment.py | 27 +++++++++++++++------------ giambio/core.py | 13 +++++++------ giambio/traps.py | 1 + giambio/util.py | 18 ++++++++++++------ 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/experiment.py b/experiment.py index 92aeeb7..ccd2dec 100644 --- a/experiment.py +++ b/experiment.py @@ -4,17 +4,17 @@ loop = giambio.EventLoop() """ -What works and what does not (23rd March 2020 23:24 PM) +What works and what does not (25th March 2020 20:35) - Run tasks concurrently: V - Join mechanism: V - Sleep mechanism: V -- Cancellation mechanism: X Note: Figure out how to rescheule parent task +- Cancellation mechanism: V - Exception propagation: V - Concurrent I/O: V - Return values of coroutines: V - Scheduling tasks for future execution: V -- Task Spawner (context manager): X Note: Not Implemented +- Task Spawner (context manager): V """ @@ -28,7 +28,7 @@ async def countdown(n): return "Count DOWN over" except giambio.CancelledError: print("countdown cancelled!") - + raise Exception("Oh no!") #TODO Propagate this async def count(stop, step=1): try: @@ -44,13 +44,16 @@ async def count(stop, step=1): async def main(): - print("Spawning countdown immediately, scheduling count for 4 secs from now") - async with giambio.TaskManager(loop) as manager: - task = await manager.spawn(countdown(4)) - await manager.schedule(count(8, 2), 4) - await task.cancel() - for task, ret in manager.values.items(): - print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}") - + try: + print("Spawning countdown immediately, scheduling count for 4 secs from now") + async with giambio.TaskManager(loop) as manager: + task = manager.spawn(countdown(4)) + manager.schedule(count(8, 2), 4) + await task.cancel() + for task, ret in manager.values.items(): + print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}") + except: # TODO: Fix this, see above + pass loop.start(main) + diff --git a/giambio/core.py b/giambio/core.py index aacf1ca..e1d8685 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -10,6 +10,7 @@ from .socket import AsyncSocket, WantRead, WantWrite from .abstractions import Task, Result from socket import SOL_SOCKET, SO_ERROR from .traps import _join, _sleep, _want_read, _want_write, _cancel +from .util import TaskManager class EventLoop: @@ -59,10 +60,8 @@ class EventLoop: self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task except Exception as has_raised: self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task - if self.running.joined: # Let the join function handle the hassle of propagating the error - self.running.result = Result(None, has_raised) # Save the exception - else: # Let the exception propagate (I'm looking at you asyncIO ;)) - raise + self.running.result = Result(None, has_raised) # Save the exception + raise except KeyboardInterrupt: self.running.throw(KeyboardInterrupt) @@ -70,7 +69,7 @@ class EventLoop: def start(self, coroutine: types.coroutine, *args, **kwargs): """Starts the event loop""" - self.to_run.append(coroutine(*args, **kwargs)) + TaskManager(self).spawn(coroutine(*args, **kwargs)) self.loop() def want_read(self, sock: socket.socket): @@ -124,6 +123,7 @@ class EventLoop: coroutine returns or, if an exception gets raised, the exception will get propagated inside the parent task""" + if coro not in self.joined: self.joined[coro].append(self.running) else: @@ -137,11 +137,12 @@ class EventLoop: self.to_run.append(self.running) # Reschedule the task that called sleep def want_cancel(self, task): - task.cancelled = True self.to_run.extend(self.joined.pop(self.running, ())) self.to_run.append(self.running) # Reschedules the parent task +# task.cancelled = True task.throw(CancelledError()) + async def connect_sock(self, sock: socket.socket, addr: tuple): try: # "Borrowed" from curio result = sock.connect(addr) diff --git a/giambio/traps.py b/giambio/traps.py index 6f4fc5c..2ccdf76 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -72,5 +72,6 @@ def _cancel(task): be cancelled at any time """ + task.cancelled = True yield "want_cancel", task diff --git a/giambio/util.py b/giambio/util.py index faa7e38..87aec07 100644 --- a/giambio/util.py +++ b/giambio/util.py @@ -18,18 +18,23 @@ class TaskManager: async def __aenter__(self): return self - async def __aexit__(self, *args): - for task in self.tasks: - if task.cancelled: - self.values[task] = CancelledError() - else: - self.values[task] = await task.join(self.silent) + async def __aexit__(self, type, value, traceback): + if type: + # TODO: Handle exceptions here + ... + else: + for task in self.tasks: + if not task.cancelled: + self.values[task] = await task.join() + else: + self.values[task] = CancelledError() def spawn(self, coroutine: types.coroutine): """Schedules a task for execution, appending it to the call stack""" task = Task(coroutine, self) self.loop.to_run.append(task) + self.tasks.append(task) return task def schedule(self, coroutine: types.coroutine, when: int): @@ -37,5 +42,6 @@ class TaskManager: self.loop.sequence += 1 task = Task(coroutine, self) + self.tasks.append(task) heappush(self.loop.paused, (self.loop.clock() + when, self.loop.sequence, task)) return task