Fixed async I/O and added echo server test
This commit is contained in:
parent
1530b54fc5
commit
cca2ec15a5
|
@ -364,8 +364,7 @@ class FIFOKernel:
|
||||||
lambda o: o.fileobj == stream,
|
lambda o: o.fileobj == stream,
|
||||||
dict(self.selector.get_map()).values(),
|
dict(self.selector.get_map()).values(),
|
||||||
):
|
):
|
||||||
for task in k.data:
|
self.handle_task_run(partial(k.data.throw, exc), k.data)
|
||||||
self.handle_task_run(partial(task.throw, exc), task)
|
|
||||||
|
|
||||||
def cancel(self, task: Task):
|
def cancel(self, task: Task):
|
||||||
"""
|
"""
|
||||||
|
@ -553,7 +552,7 @@ class FIFOKernel:
|
||||||
# I/O for the first time
|
# I/O for the first time
|
||||||
self.current_task.last_io = evt_type, resource
|
self.current_task.last_io = evt_type, resource
|
||||||
try:
|
try:
|
||||||
self.selector.register(resource, evt_type, [self.current_task])
|
self.selector.register(resource, evt_type, self.current_task)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# The stream is already being used
|
# The stream is already being used
|
||||||
key = self.selector.get_key(resource)
|
key = self.selector.get_key(resource)
|
||||||
|
@ -561,7 +560,7 @@ class FIFOKernel:
|
||||||
# If the task that registered the stream
|
# If the task that registered the stream
|
||||||
# changed their mind on what they want
|
# changed their mind on what they want
|
||||||
# to do with it, who are we to deny their
|
# to do with it, who are we to deny their
|
||||||
# request? We also modify the the event in
|
# request? We also modify the event in
|
||||||
# our selector so that one task can read
|
# our selector so that one task can read
|
||||||
# off a given stream while another one is
|
# off a given stream while another one is
|
||||||
# writing to it
|
# writing to it
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
import aiosched
|
||||||
|
from debugger import Debugger
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# A test to check for asynchronous I/O
|
||||||
|
|
||||||
|
|
||||||
|
async def serve(bind_address: tuple):
|
||||||
|
"""
|
||||||
|
Serves asynchronously forever
|
||||||
|
|
||||||
|
:param bind_address: The address to bind the server to represented as a tuple
|
||||||
|
(address, port) where address is a string and port is an integer
|
||||||
|
"""
|
||||||
|
|
||||||
|
sock = aiosched.socket.socket()
|
||||||
|
await sock.bind(bind_address)
|
||||||
|
await sock.listen(5)
|
||||||
|
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
|
||||||
|
async with aiosched.with_context() as ctx:
|
||||||
|
async with sock:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
conn, address_tuple = await sock.accept()
|
||||||
|
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
|
||||||
|
await ctx.spawn(handler, conn, address_tuple)
|
||||||
|
except Exception as err:
|
||||||
|
# Because exceptions just *work*
|
||||||
|
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
|
||||||
|
|
||||||
|
|
||||||
|
async def handler(sock: aiosched.socket.AsyncSocket, client_address: tuple):
|
||||||
|
"""
|
||||||
|
Handles a single client connection
|
||||||
|
|
||||||
|
:param sock: The AsyncSocket object connected to the client
|
||||||
|
:param client_address: The client's address represented as a tuple
|
||||||
|
(address, port) where address is a string and port is an integer
|
||||||
|
:type client_address: tuple
|
||||||
|
"""
|
||||||
|
|
||||||
|
address = f"{client_address[0]}:{client_address[1]}"
|
||||||
|
async with sock: # Closes the socket automatically
|
||||||
|
await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n")
|
||||||
|
while True:
|
||||||
|
await sock.send_all(b"-> ")
|
||||||
|
data = await sock.receive(1024)
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
elif data == b"exit\n":
|
||||||
|
await sock.send_all(b"I'm dead dude\n")
|
||||||
|
raise TypeError("Oh, no, I'm gonna die!")
|
||||||
|
logging.info(f"Got: {data!r} from {address}")
|
||||||
|
await sock.send_all(b"Got: " + data)
|
||||||
|
logging.info(f"Echoed back {data!r} to {address}")
|
||||||
|
logging.info(f"Connection from {address} closed")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501
|
||||||
|
logging.basicConfig(
|
||||||
|
level=20,
|
||||||
|
format="[%(levelname)s] %(asctime)s %(message)s",
|
||||||
|
datefmt="%d/%m/%Y %H:%M:%S %p",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
aiosched.run(serve, ("localhost", port), debugger=())
|
||||||
|
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
|
||||||
|
if isinstance(error, KeyboardInterrupt):
|
||||||
|
logging.info("Ctrl+C detected, exiting")
|
||||||
|
else:
|
||||||
|
logging.error(f"Exiting due to a {type(error).__name__}: {error}")
|
Reference in New Issue