Fixed some bugs with exceptions and propagations, I/O is broken

This commit is contained in:
nocturn9x 2020-11-27 21:52:45 +01:00
parent 4618c8cc79
commit 2429cbb863
7 changed files with 25 additions and 31 deletions

View File

@ -52,7 +52,6 @@ def sync_fun(): # A regular (sync) function
First of all, async functions like to stick together: to call an async function you need to put `await` in front of it, like below: First of all, async functions like to stick together: to call an async function you need to put `await` in front of it, like below:
```python ```python
async def async_two(): async def async_two():
print("Hello from async_two!") print("Hello from async_two!")

View File

@ -150,6 +150,7 @@ class AsyncScheduler:
self.current_task.cancelled = True self.current_task.cancelled = True
self.current_task.cancel_pending = False self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task) self.debugger.after_cancel(self.current_task)
self.join(self.current_task)
# TODO: Do we need to join? # TODO: Do we need to join?
except StopIteration as ret: except StopIteration as ret:
# Coroutine ends # Coroutine ends
@ -172,8 +173,9 @@ class AsyncScheduler:
""" """
task = task or self.current_task task = task or self.current_task
self.debugger.before_cancel(task) if not task.cancelled:
task.throw(CancelledError()) self.debugger.before_cancel(task)
task.throw(CancelledError())
def get_running(self): def get_running(self):
""" """
@ -223,9 +225,9 @@ class AsyncScheduler:
elif self.paused: elif self.paused:
# If there are asleep tasks, wait until the closest deadline # If there are asleep tasks, wait until the closest deadline
timeout = max(0.0, self.paused[0][0] - self.clock()) timeout = max(0.0, self.paused[0][0] - self.clock())
elif self.selector.get_map(): else:
# If there is *only* I/O, we wait a fixed amount of time # If there is *only* I/O, we wait a fixed amount of time
timeout = 1 # TODO: Is this ok? timeout = 1
self.debugger.before_io(timeout) self.debugger.before_io(timeout)
if self.selector.get_map(): if self.selector.get_map():
io_ready = self.selector.select(timeout) io_ready = self.selector.select(timeout)
@ -286,7 +288,7 @@ class AsyncScheduler:
""" """
task.joined = True task.joined = True
if task.finished: if task.finished or task.cancelled:
self.reschedule_joinee(task) self.reschedule_joinee(task)
elif task.exc: elif task.exc:
self.cancel_all() self.cancel_all()
@ -313,7 +315,7 @@ class AsyncScheduler:
task = task or self.current_task task = task or self.current_task
if not task.finished and not task.exc: if not task.finished and not task.exc:
if task.status in ("I/O", "sleep"): if task.status in ("io", "sleep"):
# We cancel right away # We cancel right away
self.do_cancel(task) self.do_cancel(task)
else: else:
@ -342,7 +344,7 @@ class AsyncScheduler:
selector to perform I/0 multiplexing selector to perform I/0 multiplexing
""" """
self.current_task.status = "I/O" self.current_task.status = "io"
if self.current_task.last_io: if self.current_task.last_io:
if self.current_task.last_io == ("READ", sock): if self.current_task.last_io == ("READ", sock):
# Socket is already scheduled! # Socket is already scheduled!
@ -361,7 +363,7 @@ class AsyncScheduler:
selector to perform I/0 multiplexing selector to perform I/0 multiplexing
""" """
self.current_task.status = "I/O" self.current_task.status = "io"
if self.current_task.last_io: if self.current_task.last_io:
if self.current_task.last_io == ("WRITE", sock): if self.current_task.last_io == ("WRITE", sock):
# Socket is already scheduled! # Socket is already scheduled!
@ -387,10 +389,7 @@ class AsyncScheduler:
available and returning up to buffer bytes from the socket available and returning up to buffer bytes from the socket
""" """
try: await want_read(sock)
return sock.recv(buffer)
except WantRead:
await want_read(sock)
return sock.recv(buffer) return sock.recv(buffer)
async def accept_sock(self, sock: socket.socket): async def accept_sock(self, sock: socket.socket):
@ -399,11 +398,8 @@ class AsyncScheduler:
is available and returning the result of the accept() call is available and returning the result of the accept() call
""" """
try: await want_read(sock)
return sock.accept() return sock.accept()
except WantRead:
await want_read(sock)
return sock.accept()
async def sock_sendall(self, sock: socket.socket, data: bytes): async def sock_sendall(self, sock: socket.socket, data: bytes):
""" """
@ -411,11 +407,8 @@ class AsyncScheduler:
""" """
while data: while data:
try: await want_write(sock)
sent_no = sock.send(data) sent_no = sock.send(data)
except WantWrite:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:] data = data[sent_no:]
async def close_sock(self, sock: socket.socket): async def close_sock(self, sock: socket.socket):
@ -425,7 +418,8 @@ class AsyncScheduler:
await want_write(sock) await want_write(sock)
self.selector.unregister(sock) self.selector.unregister(sock)
return sock.close() self.current_task.last_io = ()
sock.close()
async def connect_sock(self, sock: socket.socket, addr: tuple): async def connect_sock(self, sock: socket.socket, addr: tuple):
""" """

View File

@ -33,10 +33,10 @@ class InternalError(GiambioError):
... ...
class CancelledError(GiambioError): class CancelledError(BaseException):
""" """
Exception raised by the giambio.objects.Task.cancel() method Exception raised by the giambio.objects.Task.cancel() method
to terminate a child task. This should NOT be catched, or to terminate a child task. This should NOT be caught, or
at least it should be re-raised and never ignored at least it should be re-raised and never ignored
""" """

View File

@ -81,6 +81,7 @@ class Task:
def __hash__(self): def __hash__(self):
return hash(self.coroutine) return hash(self.coroutine)
class Event: class Event:
""" """
A class designed similarly to threading.Event A class designed similarly to threading.Event

View File

@ -89,7 +89,7 @@ async def cancel(task):
code, so if you really wanna do that be sure to re-raise it when done! code, so if you really wanna do that be sure to re-raise it when done!
""" """
await create_trap("cancel") await create_trap("cancel", task)
assert task.cancelled, f"Coroutine ignored CancelledError" assert task.cancelled, f"Coroutine ignored CancelledError"

View File

@ -3,8 +3,6 @@ from giambio.socket import AsyncSocket
import socket import socket
import logging import logging
import sys import sys
import traceback
# A test to check for asynchronous I/O # A test to check for asynchronous I/O
@ -52,4 +50,5 @@ if __name__ == "__main__":
if isinstance(error, KeyboardInterrupt): if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting") logging.info("Ctrl+C detected, exiting")
else: else:
raise
logging.error(f"Exiting due to a {type(error).__name__}: {error}") logging.error(f"Exiting due to a {type(error).__name__}: {error}")

View File

@ -67,9 +67,10 @@ async def main():
pool.spawn(child1) pool.spawn(child1)
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")
except Exception as error: except Exception as error:
print(f"[main] Exception from child catched! {repr(error)}") # Because exceptions just *work*!
print(f"[main] Exception from child caught! {repr(error)}")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__": if __name__ == "__main__":
giambio.run(main, debugger=None) giambio.run(main)