Fixed some stuff so the examples actually work

This commit is contained in:
nocturn9x 2020-11-12 23:10:54 +01:00
parent 8b095fbf72
commit 3c9421c84c
5 changed files with 98 additions and 58 deletions

View File

@ -57,13 +57,15 @@ class AsyncScheduler:
self.sequence = 0
def run(self):
"""Starts the loop and 'listens' for events until there are either ready or asleep tasks
"""
Starts the loop and 'listens' for events until there are either ready or asleep tasks,
then exit. This behavior kinda reflects a kernel, as coroutines can request
the loop's functionality only trough some fixed entry points, which in turn yield and
give execution control to the loop itself."""
give execution control to the loop itself.
"""
try:
while True:
try:
if not self.selector.get_map() and not any(
[self.paused, self.tasks, self.event_waiting]
): # If there is nothing to do, just exit
@ -74,7 +76,7 @@ class AsyncScheduler:
): # If there are no actively running tasks, we try to schedule the asleep ones
self.check_sleeping()
if self.selector.get_map():
self.check_io()
self.check_io() # The next step is checking for I/O
while self.tasks: # While there are tasks to run
self.current_task = (
self.tasks.popleft()
@ -104,7 +106,9 @@ class AsyncScheduler:
self.join(self.current_task)
def check_events(self):
"""Checks for ready or expired events and triggers them"""
"""
Checks for ready or expired events and triggers them
"""
for event, tasks in self.event_waiting.copy().items():
if event._set:
@ -115,7 +119,9 @@ class AsyncScheduler:
self.event_waiting.pop(event)
def check_sleeping(self):
"""Checks and reschedules sleeping tasks"""
"""
Checks and reschedules sleeping tasks
"""
wait(
max(0.0, self.paused[0][0] - self.clock())
@ -128,7 +134,9 @@ class AsyncScheduler:
break
def check_io(self):
"""Checks and schedules task to perform I/O"""
"""
Checks and schedules task to perform I/O
"""
timeout = (
0.0 if self.tasks else None
@ -140,21 +148,26 @@ class AsyncScheduler:
self.tasks.append(key.data) # Socket ready? Schedule the task
def create_task(self, coro: types.coroutine):
"""Spawns a child task"""
"""
Spawns a child task
"""
task = Task(coro)
self.tasks.append(task)
return task
def schedule_task(self, coro: types.coroutine, n: int):
"""Schedules a task for execution after n seconds"""
"""
Schedules a task for execution after n seconds
"""
task = Task(coro)
self.paused.put(task, n)
return task
def start(self, coro: types.coroutine):
"""Starts the event loop using a coroutine as an entry point.
"""
Starts the event loop using a coroutine as an entry point.
"""
entry = self.create_task(coro)
@ -165,19 +178,24 @@ class AsyncScheduler:
entry.exc = exc
crashed = True
if crashed:
raise GiambioError("Event loop crashed!") from entry.exc
raise entry.exc
return entry
def reschedule_parent(self, coro):
"""Reschedules the parent task"""
"""
Reschedules the parent task
"""
parent = self.joined.pop(coro, None)
if parent:
self.tasks.append(parent)
return parent
# TODO: More generic I/O rather than just sockets
def want_read(self, sock: socket.socket):
"""Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing"""
"""
Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing
"""
self.current_task.status = "I/O"
if self.current_task._last_io:
@ -195,7 +213,9 @@ class AsyncScheduler:
raise ResourceBusy("The given resource is busy!")
def want_write(self, sock: socket.socket):
"""Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing"""
"""
Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing
"""
self.current_task.status = "I/O"
if self.current_task._last_io:
@ -213,10 +233,12 @@ class AsyncScheduler:
raise ResourceBusy("The given resource is busy!")
def join(self, child: types.coroutine):
"""Handler for the 'join' event, does some magic to tell the scheduler
"""
Handler for the 'join' event, does some magic to tell the scheduler
to wait until the passed coroutine ends. The result of this call equals whatever the
coroutine returns or, if an exception gets raised, the exception will get propagated inside the
parent task"""
parent task
"""
if child.cancelled: # Task was cancelled and is therefore dead
self.tasks.append(self.current_task)
@ -234,7 +256,9 @@ class AsyncScheduler:
)
def sleep(self, seconds: int or float):
"""Puts the caller to sleep for a given amount of seconds"""
"""
Puts the caller to sleep for a given amount of seconds
"""
if seconds:
self.current_task.status = "sleep"
@ -243,7 +267,9 @@ class AsyncScheduler:
self.tasks.append(self.current_task)
def event_set(self, event, value):
"""Sets an event"""
"""
Sets an event
"""
event.notifier = self.current_task
event._set = True
@ -251,7 +277,9 @@ class AsyncScheduler:
self.events[event] = value
def event_wait(self, event):
"""Waits for an event"""
"""
Waits for an event
"""
if self.events.get(event, None):
event.waiting -= 1
@ -263,9 +291,11 @@ class AsyncScheduler:
self.event_waiting[event].append(self.current_task)
def cancel(self, task):
"""Handler for the 'cancel' event, throws CancelledError inside a coroutine
"""
Handler for the 'cancel' event, throws CancelledError inside a coroutine
in order to stop it from executing. The loop continues to execute as tasks
are independent"""
are independent
"""
if (
task.status in ("sleep", "I/O") and not task.cancelled
@ -276,12 +306,15 @@ class AsyncScheduler:
task.status = "cancel" # Cancellation is deferred
def wrap_socket(self, sock):
"""Wraps a standard socket into an AsyncSocket object"""
"""
Wraps a standard socket into an AsyncSocket object
"""
return AsyncSocket(sock, self)
async def read_sock(self, sock: socket.socket, buffer: int):
"""Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
"""
Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
from the socket
"""
@ -289,7 +322,8 @@ class AsyncScheduler:
return sock.recv(buffer)
async def accept_sock(self, sock: socket.socket):
"""Accepts a socket connection asynchronously, waiting until the resource is available and returning the
"""
Accepts a socket connection asynchronously, waiting until the resource is available and returning the
result of the accept() call
"""
@ -297,7 +331,9 @@ class AsyncScheduler:
return sock.accept()
async def sock_sendall(self, sock: socket.socket, data: bytes):
"""Sends all the passed data, as bytes, trough the socket asynchronously"""
"""
Sends all the passed data, as bytes, trough the socket asynchronously
"""
while data:
await want_write(sock)
@ -305,13 +341,17 @@ class AsyncScheduler:
data = data[sent_no:]
async def close_sock(self, sock: socket.socket):
"""Closes the socket asynchronously"""
"""
Closes the socket asynchronously
"""
await want_write(sock)
return sock.close()
async def connect_sock(self, sock: socket.socket, addr: tuple):
"""Connects a socket asynchronously"""
"""
Connects a socket asynchronously
"""
try: # "Borrowed" from curio
return sock.connect(addr)

View File

@ -41,5 +41,5 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if __name__ == "__main__":
try:
sched.start(server(("", 25001)))
except giambio.exceptions.GiambioError as wrapper: # Exceptions propagate!
print(f"Exiting due to a {type(wrapper.__cause__).__name__}")
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")