diff --git a/giambio/_core.py b/giambio/_core.py index da5c599..a642037 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -57,13 +57,15 @@ class AsyncScheduler: self.sequence = 0 def run(self): - """Starts the loop and 'listens' for events until there are either ready or asleep tasks + """ + Starts the loop and 'listens' for events until there are either ready or asleep tasks, then exit. This behavior kinda reflects a kernel, as coroutines can request the loop's functionality only trough some fixed entry points, which in turn yield and - give execution control to the loop itself.""" + give execution control to the loop itself. + """ - try: - while True: + while True: + try: if not self.selector.get_map() and not any( [self.paused, self.tasks, self.event_waiting] ): # If there is nothing to do, just exit @@ -74,7 +76,7 @@ class AsyncScheduler: ): # If there are no actively running tasks, we try to schedule the asleep ones self.check_sleeping() if self.selector.get_map(): - self.check_io() + self.check_io() # The next step is checking for I/O while self.tasks: # While there are tasks to run self.current_task = ( self.tasks.popleft() @@ -91,20 +93,22 @@ class AsyncScheduler: ) # Sneaky method call, thanks to David Beazley for this ;) if self.event_waiting: self.check_events() - except CancelledError as cancelled: - self.tasks.remove(cancelled.args[0]) # Remove the dead task - self.tasks.append(self.current_task) - except StopIteration as e: # Coroutine ends - self.current_task.result = e.args[0] if e.args else None - self.current_task.finished = True - self.reschedule_parent(self.current_task) - except BaseException as error: # Coroutine raised - self.current_task.exc = error - self.reschedule_parent(self.current_task) - self.join(self.current_task) + except CancelledError as cancelled: + self.tasks.remove(cancelled.args[0]) # Remove the dead task + self.tasks.append(self.current_task) + except StopIteration as e: # Coroutine ends + self.current_task.result = e.args[0] if e.args else None + self.current_task.finished = True + self.reschedule_parent(self.current_task) + except BaseException as error: # Coroutine raised + self.current_task.exc = error + self.reschedule_parent(self.current_task) + self.join(self.current_task) def check_events(self): - """Checks for ready or expired events and triggers them""" + """ + Checks for ready or expired events and triggers them + """ for event, tasks in self.event_waiting.copy().items(): if event._set: @@ -115,7 +119,9 @@ class AsyncScheduler: self.event_waiting.pop(event) def check_sleeping(self): - """Checks and reschedules sleeping tasks""" + """ + Checks and reschedules sleeping tasks + """ wait( max(0.0, self.paused[0][0] - self.clock()) @@ -128,7 +134,9 @@ class AsyncScheduler: break def check_io(self): - """Checks and schedules task to perform I/O""" + """ + Checks and schedules task to perform I/O + """ timeout = ( 0.0 if self.tasks else None @@ -140,21 +148,26 @@ class AsyncScheduler: self.tasks.append(key.data) # Socket ready? Schedule the task def create_task(self, coro: types.coroutine): - """Spawns a child task""" + """ + Spawns a child task + """ task = Task(coro) self.tasks.append(task) return task def schedule_task(self, coro: types.coroutine, n: int): - """Schedules a task for execution after n seconds""" + """ + Schedules a task for execution after n seconds + """ task = Task(coro) self.paused.put(task, n) return task def start(self, coro: types.coroutine): - """Starts the event loop using a coroutine as an entry point. + """ + Starts the event loop using a coroutine as an entry point. """ entry = self.create_task(coro) @@ -165,19 +178,24 @@ class AsyncScheduler: entry.exc = exc crashed = True if crashed: - raise GiambioError("Event loop crashed!") from entry.exc + raise entry.exc return entry def reschedule_parent(self, coro): - """Reschedules the parent task""" + """ + Reschedules the parent task + """ parent = self.joined.pop(coro, None) if parent: self.tasks.append(parent) return parent + # TODO: More generic I/O rather than just sockets def want_read(self, sock: socket.socket): - """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" + """ + Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing + """ self.current_task.status = "I/O" if self.current_task._last_io: @@ -195,7 +213,9 @@ class AsyncScheduler: raise ResourceBusy("The given resource is busy!") def want_write(self, sock: socket.socket): - """Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing""" + """ + Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing + """ self.current_task.status = "I/O" if self.current_task._last_io: @@ -213,10 +233,12 @@ class AsyncScheduler: raise ResourceBusy("The given resource is busy!") def join(self, child: types.coroutine): - """Handler for the 'join' event, does some magic to tell the scheduler + """ + Handler for the 'join' event, does some magic to tell the scheduler to wait until the passed coroutine ends. The result of this call equals whatever the coroutine returns or, if an exception gets raised, the exception will get propagated inside the - parent task""" + parent task + """ if child.cancelled: # Task was cancelled and is therefore dead self.tasks.append(self.current_task) @@ -234,7 +256,9 @@ class AsyncScheduler: ) def sleep(self, seconds: int or float): - """Puts the caller to sleep for a given amount of seconds""" + """ + Puts the caller to sleep for a given amount of seconds + """ if seconds: self.current_task.status = "sleep" @@ -243,7 +267,9 @@ class AsyncScheduler: self.tasks.append(self.current_task) def event_set(self, event, value): - """Sets an event""" + """ + Sets an event + """ event.notifier = self.current_task event._set = True @@ -251,7 +277,9 @@ class AsyncScheduler: self.events[event] = value def event_wait(self, event): - """Waits for an event""" + """ + Waits for an event + """ if self.events.get(event, None): event.waiting -= 1 @@ -263,9 +291,11 @@ class AsyncScheduler: self.event_waiting[event].append(self.current_task) def cancel(self, task): - """Handler for the 'cancel' event, throws CancelledError inside a coroutine + """ + Handler for the 'cancel' event, throws CancelledError inside a coroutine in order to stop it from executing. The loop continues to execute as tasks - are independent""" + are independent + """ if ( task.status in ("sleep", "I/O") and not task.cancelled @@ -276,12 +306,15 @@ class AsyncScheduler: task.status = "cancel" # Cancellation is deferred def wrap_socket(self, sock): - """Wraps a standard socket into an AsyncSocket object""" + """ + Wraps a standard socket into an AsyncSocket object + """ return AsyncSocket(sock, self) async def read_sock(self, sock: socket.socket, buffer: int): - """Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes + """ + Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes from the socket """ @@ -289,7 +322,8 @@ class AsyncScheduler: return sock.recv(buffer) async def accept_sock(self, sock: socket.socket): - """Accepts a socket connection asynchronously, waiting until the resource is available and returning the + """ + Accepts a socket connection asynchronously, waiting until the resource is available and returning the result of the accept() call """ @@ -297,7 +331,9 @@ class AsyncScheduler: return sock.accept() async def sock_sendall(self, sock: socket.socket, data: bytes): - """Sends all the passed data, as bytes, trough the socket asynchronously""" + """ + Sends all the passed data, as bytes, trough the socket asynchronously + """ while data: await want_write(sock) @@ -305,13 +341,17 @@ class AsyncScheduler: data = data[sent_no:] async def close_sock(self, sock: socket.socket): - """Closes the socket asynchronously""" + """ + Closes the socket asynchronously + """ await want_write(sock) return sock.close() async def connect_sock(self, sock: socket.socket, addr: tuple): - """Connects a socket asynchronously""" + """ + Connects a socket asynchronously + """ try: # "Borrowed" from curio return sock.connect(addr) diff --git a/giambio/_layers.py b/giambio/_layers.py index 44f8350..f6a503c 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -74,7 +74,7 @@ class Event: async def set(self, value=True): """Sets the event, optionally taking a value. This can be used - to control tasks' flow by 'sending' commands back and fort""" + to control tasks' flow by 'sending' commands back and fort""" if self._set: raise GiambioError("The event has already been set") @@ -89,7 +89,7 @@ class Event: class TimeQueue: """An abstraction layer over a heap queue based on time. This is where - sleeping tasks will be put when they are asleep""" + sleeping tasks will be put when they are asleep""" def __init__(self, clock): self.clock = clock diff --git a/giambio/_traps.py b/giambio/_traps.py index 8f8d5a9..b58d461 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -46,8 +46,8 @@ def sleep(seconds: int): def join(task): """'Tells' the scheduler that the desired task MUST be awaited for completion - :param task: The task to join - :type task: class: Task + :param task: The task to join + :type task: class: Task """ res = yield "join", task @@ -58,11 +58,11 @@ def join(task): def cancel(task): """'Tells' the scheduler that the passed task must be cancelled - The concept of cancellation here is tricky, because there is no real way to 'stop' a - running task if not by raising an exception inside it and just ignore whatever the task - returns (and also hoping that the task won't cause damage when exiting abruptly). - It is highly recommended that when you write a coroutine you take into account that it might - be cancelled at any time + The concept of cancellation here is tricky, because there is no real way to 'stop' a + running task if not by raising an exception inside it and just ignore whatever the task + returns (and also hoping that the task won't cause damage when exiting abruptly). + It is highly recommended that when you write a coroutine you take into account that it might + be cancelled at any time """ yield "cancel", task @@ -73,8 +73,8 @@ def cancel(task): def want_read(sock: socket.socket): """'Tells' the event loop that there is some coroutine that wants to read from the given socket - :param sock: The socket to perform the operation on - :type sock: class: socket.socket + :param sock: The socket to perform the operation on + :type sock: class: socket.socket """ yield "want_read", sock @@ -84,8 +84,8 @@ def want_read(sock: socket.socket): def want_write(sock: socket.socket): """'Tells' the event loop that there is some coroutine that wants to write on the given socket - :param sock: The socket to perform the operation on - :type sock: class: socket.socket + :param sock: The socket to perform the operation on + :type sock: class: socket.socket """ yield "want_write", sock @@ -94,8 +94,8 @@ def want_write(sock: socket.socket): @types.coroutine def event_set(event, value): """Communicates to the loop that the given event object - must be set. This is important as the loop constantly - checks for active events to deliver them + must be set. This is important as the loop constantly + checks for active events to deliver them """ yield "event_set", event, value @@ -104,7 +104,7 @@ def event_set(event, value): @types.coroutine def event_wait(event): """Notifies the event loop that the current task has to wait - for the event to trigger + for the event to trigger """ msg = yield "event_wait", event diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 3bbb35c..e9e6580 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -34,7 +34,7 @@ class CancelledError(BaseException): class ResourceBusy(GiambioError): """Exception that is raised when a resource is accessed by more than - one task at a time""" + one task at a time""" pass diff --git a/tests/server.py b/tests/server.py index cf96515..869c9fa 100644 --- a/tests/server.py +++ b/tests/server.py @@ -41,5 +41,5 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": try: sched.start(server(("", 25001))) - except giambio.exceptions.GiambioError as wrapper: # Exceptions propagate! - print(f"Exiting due to a {type(wrapper.__cause__).__name__}") + except KeyboardInterrupt: # Exceptions propagate! + print("Exiting...")