From 2429cbb863115ee23b41f842684dc00e5be995b0 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Fri, 27 Nov 2020 21:52:45 +0100 Subject: [PATCH] Fixed some bugs with exceptions and propagations, I/O is broken --- README.md | 1 - giambio/core.py | 40 +++++++++++++++++----------------------- giambio/exceptions.py | 4 ++-- giambio/objects.py | 1 + giambio/traps.py | 2 +- tests/server.py | 3 +-- tests/sleep.py | 5 +++-- 7 files changed, 25 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index aef1cc9..575f7ad 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,6 @@ def sync_fun(): # A regular (sync) function First of all, async functions like to stick together: to call an async function you need to put `await` in front of it, like below: ```python - async def async_two(): print("Hello from async_two!") diff --git a/giambio/core.py b/giambio/core.py index 6dc1db6..6ba735b 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -150,6 +150,7 @@ class AsyncScheduler: self.current_task.cancelled = True self.current_task.cancel_pending = False self.debugger.after_cancel(self.current_task) + self.join(self.current_task) # TODO: Do we need to join? except StopIteration as ret: # Coroutine ends @@ -172,8 +173,9 @@ class AsyncScheduler: """ task = task or self.current_task - self.debugger.before_cancel(task) - task.throw(CancelledError()) + if not task.cancelled: + self.debugger.before_cancel(task) + task.throw(CancelledError()) def get_running(self): """ @@ -223,9 +225,9 @@ class AsyncScheduler: elif self.paused: # If there are asleep tasks, wait until the closest deadline timeout = max(0.0, self.paused[0][0] - self.clock()) - elif self.selector.get_map(): + else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 1 # TODO: Is this ok? + timeout = 1 self.debugger.before_io(timeout) if self.selector.get_map(): io_ready = self.selector.select(timeout) @@ -286,7 +288,7 @@ class AsyncScheduler: """ task.joined = True - if task.finished: + if task.finished or task.cancelled: self.reschedule_joinee(task) elif task.exc: self.cancel_all() @@ -313,7 +315,7 @@ class AsyncScheduler: task = task or self.current_task if not task.finished and not task.exc: - if task.status in ("I/O", "sleep"): + if task.status in ("io", "sleep"): # We cancel right away self.do_cancel(task) else: @@ -342,7 +344,7 @@ class AsyncScheduler: selector to perform I/0 multiplexing """ - self.current_task.status = "I/O" + self.current_task.status = "io" if self.current_task.last_io: if self.current_task.last_io == ("READ", sock): # Socket is already scheduled! @@ -361,7 +363,7 @@ class AsyncScheduler: selector to perform I/0 multiplexing """ - self.current_task.status = "I/O" + self.current_task.status = "io" if self.current_task.last_io: if self.current_task.last_io == ("WRITE", sock): # Socket is already scheduled! @@ -387,10 +389,7 @@ class AsyncScheduler: available and returning up to buffer bytes from the socket """ - try: - return sock.recv(buffer) - except WantRead: - await want_read(sock) + await want_read(sock) return sock.recv(buffer) async def accept_sock(self, sock: socket.socket): @@ -399,11 +398,8 @@ class AsyncScheduler: is available and returning the result of the accept() call """ - try: - return sock.accept() - except WantRead: - await want_read(sock) - return sock.accept() + await want_read(sock) + return sock.accept() async def sock_sendall(self, sock: socket.socket, data: bytes): """ @@ -411,11 +407,8 @@ class AsyncScheduler: """ while data: - try: - sent_no = sock.send(data) - except WantWrite: - await want_write(sock) - sent_no = sock.send(data) + await want_write(sock) + sent_no = sock.send(data) data = data[sent_no:] async def close_sock(self, sock: socket.socket): @@ -425,7 +418,8 @@ class AsyncScheduler: await want_write(sock) self.selector.unregister(sock) - return sock.close() + self.current_task.last_io = () + sock.close() async def connect_sock(self, sock: socket.socket, addr: tuple): """ diff --git a/giambio/exceptions.py b/giambio/exceptions.py index a59f79e..e66b7a6 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -33,10 +33,10 @@ class InternalError(GiambioError): ... -class CancelledError(GiambioError): +class CancelledError(BaseException): """ Exception raised by the giambio.objects.Task.cancel() method - to terminate a child task. This should NOT be catched, or + to terminate a child task. This should NOT be caught, or at least it should be re-raised and never ignored """ diff --git a/giambio/objects.py b/giambio/objects.py index 333c8c7..dee93d0 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -81,6 +81,7 @@ class Task: def __hash__(self): return hash(self.coroutine) + class Event: """ A class designed similarly to threading.Event diff --git a/giambio/traps.py b/giambio/traps.py index a1a51be..e601205 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -89,7 +89,7 @@ async def cancel(task): code, so if you really wanna do that be sure to re-raise it when done! """ - await create_trap("cancel") + await create_trap("cancel", task) assert task.cancelled, f"Coroutine ignored CancelledError" diff --git a/tests/server.py b/tests/server.py index bb90afd..255a94f 100644 --- a/tests/server.py +++ b/tests/server.py @@ -3,8 +3,6 @@ from giambio.socket import AsyncSocket import socket import logging import sys -import traceback - # A test to check for asynchronous I/O @@ -52,4 +50,5 @@ if __name__ == "__main__": if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") else: + raise logging.error(f"Exiting due to a {type(error).__name__}: {error}") diff --git a/tests/sleep.py b/tests/sleep.py index d4bb8e4..671834a 100644 --- a/tests/sleep.py +++ b/tests/sleep.py @@ -67,9 +67,10 @@ async def main(): pool.spawn(child1) print("[main] Children spawned, awaiting completion") except Exception as error: - print(f"[main] Exception from child catched! {repr(error)}") + # Because exceptions just *work*! + print(f"[main] Exception from child caught! {repr(error)}") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": - giambio.run(main, debugger=None) + giambio.run(main)