diff --git a/experiment.py b/experiment.py index 321c0ae..9963755 100644 --- a/experiment.py +++ b/experiment.py @@ -5,25 +5,29 @@ loop = giambio.EventLoop() """ -What works and what does not +What works and what does not (21st March 2020 10:33 AM) - Run tasks concurrently: V - Join mechanism: V - Sleep mechanism: V -- Cancellation mechanism: V +- Cancellation mechanism: X Note: giambio.exceptions.CancelledError is raised inside the parent task instead of the child one, probably related to some f*ck ups with the value of EventLoop.running, need to investigate - Exception propagation: V - Concurrent I/O: X Note: I/O would work only when a task is joined (weird) - Return values of coroutines: X Note: Return values ARE actually stored in task objects properly, but are messed up later when joining tasks +- Scheduling tasks for future execution: V """ async def countdown(n): - while n > 0: - print(f"Down {n}") - n -= 1 - await giambio.sleep(1) - return "Count DOWN over" - + try: + while n > 0: + print(f"Down {n}") + n -= 1 + await giambio.sleep(1) + print("Countdown over") + return "Count DOWN over" + except CancelledError: + print("countdown cancelled!") async def count(stop, step=1): x = 0 @@ -31,13 +35,16 @@ async def count(stop, step=1): print(f"Up {x}") x += step await giambio.sleep(step) + print("Countup over") return "Count UP over" async def main(): + print("Spawning countdown immediately, scheduling count for 2 secs from now") task = loop.spawn(countdown(8)) - task1 = loop.spawn(count(8, 2)) - print(await giambio.join(task)) - print(await giambio.join(task1)) + task1 = loop.schedule(count(12, 2), 2) + await task.join() + await task1.join() + print("All done") loop.start(main) diff --git a/giambio/core.py b/giambio/core.py index 02a44e7..f6c0756 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -30,7 +30,7 @@ class EventLoop: """Main event loop for giambio""" while True: - if not self.selector.get_map() and not self.to_run: + if not self.selector.get_map() and not any((self.to_run + deque(self.paused))): break while self.selector.get_map(): # If there are sockets ready, (re)schedule their associated task timeout = 0.0 if self.to_run else None @@ -70,6 +70,14 @@ class EventLoop: self.to_run.append(task) return task + def schedule(self, coroutine: types.coroutine, when: int): + """Schedules a task for execution after n seconds""" + + self.sequence += 1 + task = Task(coroutine) + heappush(self.paused, (self.clock() + when, self.sequence, task)) + return task + def start(self, coroutine: types.coroutine, *args, **kwargs): self.spawn(coroutine(*args, **kwargs)) self.loop() diff --git a/giambio/socket.py b/giambio/socket.py index deff7a0..1bd9076 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -50,4 +50,4 @@ class AsyncSocket(object): return self.sock.__exit__(*args) def __repr__(self): - return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" \ No newline at end of file + return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" diff --git a/test.py b/test.py index 42d337f..91b19d8 100644 --- a/test.py +++ b/test.py @@ -24,7 +24,7 @@ async def make_srv(address: tuple): conn, addr = await asock.accept() logging.info(f"{addr} connected") task = loop.spawn(echo_server(conn, addr)) - await task.cancel() +# await task.cancel() # Cancel task! async def echo_server(sock: AsyncSocket, addr: tuple): @@ -43,4 +43,4 @@ async def echo_server(sock: AsyncSocket, addr: tuple): logging.info(f"Connection from {addr} closed") -loop.start(make_srv, ('', 1500)) +loop.start(make_srv, ('', 1501))