diff --git a/giambio/__init__.py b/giambio/__init__.py index 9260a7e..c684fa0 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -20,10 +20,11 @@ __author__ = "Nocturn9x aka Isgiambyy" __version__ = (1, 0, 0) -from . import exceptions +from . import exceptions, socket +from .socket import wrap_socket from .traps import sleep, current_task from .objects import Event -from .run import run, clock, wrap_socket, create_pool, get_event_loop, new_event_loop +from .run import run, clock, create_pool, get_event_loop, new_event_loop from .util import debug __all__ = [ diff --git a/giambio/core.py b/giambio/core.py index 343540b..78096e5 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -22,11 +22,9 @@ import socket from time import sleep as wait from timeit import default_timer from .objects import Task, TimeQueue -from socket import SOL_SOCKET, SO_ERROR from .traps import want_read, want_write from .util.debug import BaseDebugger from itertools import chain -from .socket import AsyncSocket, WantWrite, WantRead from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from .exceptions import (InternalError, CancelledError, @@ -34,6 +32,9 @@ from .exceptions import (InternalError, ) +IOInterrupt = (BlockingIOError, InterruptedError) + + class AsyncScheduler: """ A simple asynchronous scheduler implementation. Tries to mimic the threaded @@ -158,6 +159,7 @@ class AsyncScheduler: self.debugger.on_task_exit(self.current_task) self.join(self.current_task) except BaseException as err: + raise # Task raised an exception self.current_task.exc = err self.current_task.status = "crashed" @@ -307,7 +309,14 @@ class AsyncScheduler: """ for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): - self.cancel(to_cancel) + try: + self.cancel(to_cancel) + except CancelledError: + # Task was cancelled + self.current_task.status = "cancelled" + self.current_task.cancelled = True + self.current_task.cancel_pending = False + self.debugger.after_cancel(self.current_task) def close(self): """ @@ -315,9 +324,8 @@ class AsyncScheduler: inside it and tearing down any extra machinery """ - # self.cancel_all() - # self.shutdown() - ... + self.cancel_all() + self.shutdown() def join(self, task: Task): """ @@ -404,13 +412,6 @@ class AsyncScheduler: # The socket is already registered doing something else raise ResourceBusy("The given resource is busy!") from None - def wrap_socket(self, sock): - """ - Wraps a standard socket into an AsyncSocket object - """ - - return AsyncSocket(sock, self) - async def read_sock(self, sock: socket.socket, buffer: int): """ Reads from a socket asynchronously, waiting until the resource is @@ -426,8 +427,17 @@ class AsyncScheduler: is available and returning the result of the accept() call """ - await want_read(sock) - return sock.accept() + # TODO: Is this ok? + # This does not feel right because the loop will only + # exit when the socket has been accepted, preventing other + # stuff from running + while True: + try: + return sock.accept() + except IOInterrupt: # Do we need this exception thingy everywhere? + # Some methods have never errored out, but this did and doing + # so seemed to fix the issue, needs investigation + await want_read(sock) async def sock_sendall(self, sock: socket.socket, data: bytes): """ @@ -446,19 +456,14 @@ class AsyncScheduler: """ await want_write(sock) + sock.close() self.selector.unregister(sock) self.current_task.last_io = () - sock.close() async def connect_sock(self, sock: socket.socket, addr: tuple): """ Connects a socket asynchronously """ - try: # "Borrowed" from curio - return sock.connect(addr) - except WantWrite: - await want_write(sock) - err = sock.getsockopt(SOL_SOCKET, SO_ERROR) - if err != 0: - raise OSError(err, f"Connect call failed: {addr}") \ No newline at end of file + await want_write(sock) + return sock.connect(addr) diff --git a/giambio/run.py b/giambio/run.py index 3bdd846..60a328a 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -16,14 +16,12 @@ See the License for the specific language governing permissions and limitations under the License. """ -import socket import inspect import threading from .core import AsyncScheduler from .exceptions import GiambioError from .context import TaskManager from timeit import default_timer -from .socket import AsyncSocket from .util.debug import BaseDebugger from types import FunctionType @@ -87,14 +85,6 @@ def clock(): return get_event_loop().clock() -def wrap_socket(sock: socket.socket) -> AsyncSocket: - """ - Wraps a synchronous socket into a giambio.socket.AsyncSocket - """ - - return get_event_loop().wrap_socket(sock) - - def create_pool(): """ Creates an async pool diff --git a/giambio/socket.py b/giambio/socket.py index 8e5301f..00f679c 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -16,30 +16,19 @@ See the License for the specific language governing permissions and limitations under the License. """ - +from .run import get_event_loop import socket from .exceptions import ResourceClosed -from .traps import sleep -# Stolen from curio -try: - from ssl import SSLWantReadError, SSLWantWriteError - WantRead = (BlockingIOError, InterruptedError, SSLWantReadError) - WantWrite = (BlockingIOError, InterruptedError, SSLWantWriteError) -except ImportError: - WantRead = (BlockingIOError, InterruptedError) - WantWrite = (BlockingIOError, InterruptedError) - - -class AsyncSocket(object): +class AsyncSocket: """ - Abstraction layer for asynchronous TCP sockets + Abstraction layer for asynchronous sockets """ - def __init__(self, sock: socket.socket, loop): + def __init__(self, sock: socket.socket): self.sock = sock - self.loop = loop + self.loop = get_event_loop() self._closed = False self.sock.setblocking(False) @@ -60,7 +49,7 @@ class AsyncSocket(object): if self._closed: raise ResourceClosed("I/O operation on closed socket") to_wrap = await self.loop.accept_sock(self.sock) - return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] + return wrap_socket(to_wrap[0]), to_wrap[1] async def send_all(self, data: bytes): """ @@ -98,3 +87,11 @@ class AsyncSocket(object): def __repr__(self): return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" + + +def wrap_socket(sock: socket.socket) -> AsyncSocket: + """ + Wraps a standard socket into an async socket + """ + + return AsyncSocket(sock) \ No newline at end of file diff --git a/tests/server.py b/tests/server.py index 229030f..9b0e42a 100644 --- a/tests/server.py +++ b/tests/server.py @@ -47,6 +47,7 @@ if __name__ == "__main__": try: giambio.run(serve, ("localhost", port)) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + raise if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") else: