From cca2ec15a531c3f2662641757b1ca1dacbb06047 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Wed, 2 Nov 2022 09:34:36 +0100 Subject: [PATCH] Fixed async I/O and added echo server test --- aiosched/kernel.py | 7 ++--- tests/echo_server.py | 73 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 tests/echo_server.py diff --git a/aiosched/kernel.py b/aiosched/kernel.py index 7e96807..d4d56ff 100644 --- a/aiosched/kernel.py +++ b/aiosched/kernel.py @@ -364,8 +364,7 @@ class FIFOKernel: lambda o: o.fileobj == stream, dict(self.selector.get_map()).values(), ): - for task in k.data: - self.handle_task_run(partial(task.throw, exc), task) + self.handle_task_run(partial(k.data.throw, exc), k.data) def cancel(self, task: Task): """ @@ -553,7 +552,7 @@ class FIFOKernel: # I/O for the first time self.current_task.last_io = evt_type, resource try: - self.selector.register(resource, evt_type, [self.current_task]) + self.selector.register(resource, evt_type, self.current_task) except KeyError: # The stream is already being used key = self.selector.get_key(resource) @@ -561,7 +560,7 @@ class FIFOKernel: # If the task that registered the stream # changed their mind on what they want # to do with it, who are we to deny their - # request? We also modify the the event in + # request? We also modify the event in # our selector so that one task can read # off a given stream while another one is # writing to it diff --git a/tests/echo_server.py b/tests/echo_server.py new file mode 100644 index 0000000..2000948 --- /dev/null +++ b/tests/echo_server.py @@ -0,0 +1,73 @@ +import aiosched +from debugger import Debugger +import logging +import sys + +# A test to check for asynchronous I/O + + +async def serve(bind_address: tuple): + """ + Serves asynchronously forever + + :param bind_address: The address to bind the server to represented as a tuple + (address, port) where address is a string and port is an integer + """ + + sock = aiosched.socket.socket() + await sock.bind(bind_address) + await sock.listen(5) + logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") + async with aiosched.with_context() as ctx: + async with sock: + while True: + try: + conn, address_tuple = await sock.accept() + logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") + await ctx.spawn(handler, conn, address_tuple) + except Exception as err: + # Because exceptions just *work* + logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}") + + +async def handler(sock: aiosched.socket.AsyncSocket, client_address: tuple): + """ + Handles a single client connection + + :param sock: The AsyncSocket object connected to the client + :param client_address: The client's address represented as a tuple + (address, port) where address is a string and port is an integer + :type client_address: tuple + """ + + address = f"{client_address[0]}:{client_address[1]}" + async with sock: # Closes the socket automatically + await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n") + while True: + await sock.send_all(b"-> ") + data = await sock.receive(1024) + if not data: + break + elif data == b"exit\n": + await sock.send_all(b"I'm dead dude\n") + raise TypeError("Oh, no, I'm gonna die!") + logging.info(f"Got: {data!r} from {address}") + await sock.send_all(b"Got: " + data) + logging.info(f"Echoed back {data!r} to {address}") + logging.info(f"Connection from {address} closed") + + +if __name__ == "__main__": + port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501 + logging.basicConfig( + level=20, + format="[%(levelname)s] %(asctime)s %(message)s", + datefmt="%d/%m/%Y %H:%M:%S %p", + ) + try: + aiosched.run(serve, ("localhost", port), debugger=()) + except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + if isinstance(error, KeyboardInterrupt): + logging.info("Ctrl+C detected, exiting") + else: + logging.error(f"Exiting due to a {type(error).__name__}: {error}")