From 1676f3149b4d69975c35ae29da7f7aab6decc42d Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 6 Jul 2020 20:09:13 +0000 Subject: [PATCH] Small changes, cancellation needs a fix --- README.md | 5 ++--- giambio/_core.py | 51 +++++++++++++++++++++++----------------------- giambio/_layers.py | 3 ++- tests/count.py | 16 +++++++-------- 4 files changed, 38 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index ca205ca..0eeb81a 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ giambio has been designed with simplicity in mind, so this README won't go deep Just to clarify things, giambio does not avoid the Global Interpreter Lock nor it performs any sort of multithreading or multiprocessing (at least by default). Remember that **concurrency is not parallelism**, concurrent tasks will switch back and forth and proceed with their calculations but won't be running independently like they would do if they were forked off to a process pool. That's why it is called concurrency, because multiple tasks **concur** for the same amount of resources. (Which is basically the same thing that happens inside your CPU at a much lower level, because processors run many more tasks than their actual number of cores) -If you read carefully, you might now wonder: _"If a coroutine can call other coroutines, but synchronous functions cannot, how do I enter the async context in the first place?"_. This is done trough a special **synchronous function** (the `start()` method of an `EventLoop` object in our case) which can call asynchronous ones, that **must** be called from a synchronous context to avoid a horrible *deadlock*. +If you read carefully, you might now wonder: _"If a coroutine can call other coroutines, but synchronous functions cannot, how do I enter the async context in the first place?"_. This is done trough a special **synchronous function** (the `start` method of an `AsyncScheduler` object in our case) which can call asynchronous ones, that **must** be called from a synchronous context to avoid a horrible *deadlock*. ## Let's code @@ -80,9 +80,8 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": - sched.create_task(server(('', 25000))) try: - sched.run() + sched.start(server(('', 25000))) except KeyboardInterrupt: # Exceptions propagate! print("Exiting...") ``` diff --git a/giambio/_core.py b/giambio/_core.py index 685909d..bb02e38 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -1,17 +1,17 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ # Import libraries and internal resources @@ -68,30 +68,28 @@ class AsyncScheduler: if not self.paused: break timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely - tasks = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks - for key, _ in tasks: + io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: self.tasks.append(key.data) # Socket ready? Schedule the task self.selector.unregister( - key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) + key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) while self.tasks: # While there are tasks to run self.current_task = self.tasks.popleft() # Sets the currently running task + self.current_task.status = "run" try: method, *args = self.current_task.run() # Run a single step with the calculation getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) except CancelledError as cancelled: # Coroutine was cancelled task = cancelled.args[0] task.cancelled = True - self.reschedule_parent() - self.tasks.append(self.current_task) - except RuntimeError: - self.reschedule_parent() + self.tasks.remove(task) except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None self.current_task.finished = True - self.reschedule_parent() - except Exception as error: # Coroutine raised + self.reschedule_parent(self.current_task) + except BaseException as error: # Coroutine raised self.current_task.exc = error - self.reschedule_parent() + self.reschedule_parent(self.current_task) raise # Maybe find a better way to propagate errors? @@ -109,12 +107,13 @@ class AsyncScheduler: self.create_task(coro) self.run() - def reschedule_parent(self): + def reschedule_parent(self, coro): """Reschedules the parent task""" - popped = self.joined.pop(self.current_task, None) + popped = self.joined.pop(coro, None) if popped: self.tasks.append(popped) + return popped def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" @@ -139,14 +138,14 @@ class AsyncScheduler: if busy: raise ResourceBusy("The given resource is busy!") - def join(self, coro: types.coroutine): + def join(self, child: types.coroutine): """Handler for the 'join' event, does some magic to tell the scheduler to wait until the passed coroutine ends. The result of this call equals whatever the coroutine returns or, if an exception gets raised, the exception will get propagated inside the parent task""" - if coro not in self.joined: - self.joined[coro] = self.current_task + if child not in self.joined: + self.joined[child] = self.current_task else: raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") @@ -154,6 +153,7 @@ class AsyncScheduler: """Puts the caller to sleep for a given amount of seconds""" self.sequence += 1 + self.current_task.status = "sleep" heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task)) def cancel(self, task): @@ -161,6 +161,7 @@ class AsyncScheduler: in order to stop it from executing. The loop continues to execute as tasks are independent""" + self.reschedule_parent(task) task.throw(CancelledError(task)) def wrap_socket(self, sock): diff --git a/giambio/_layers.py b/giambio/_layers.py index d693274..ba908f7 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -28,6 +28,7 @@ class Task: self.exc = None self.result = None self.finished = False + self.status = "init" def run(self, what=None): """Simple abstraction layer over the coroutines ``send`` method""" @@ -52,4 +53,4 @@ class Task: def __repr__(self): """Implements repr(self)""" - return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished})" + return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished}, status={self.status})" diff --git a/tests/count.py b/tests/count.py index 451a967..8b7c90d 100644 --- a/tests/count.py +++ b/tests/count.py @@ -1,20 +1,20 @@ -from giambio import AsyncScheduler, sleep +import giambio async def countdown(n: int): while n > 0: print(f"Down {n}") n -= 1 - await sleep(1) + await giambio.sleep(1) print("Countdown over") -async def countup(stop, step: int or float = 1): +async def countup(stop: int, step: int = 1): x = 0 while x < stop: print(f"Up {x}") x += 1 - await sleep(step) + await giambio.sleep(step) print("Countup over") @@ -22,14 +22,14 @@ async def main(): cdown = scheduler.create_task(countdown(10)) cup = scheduler.create_task(countup(5, 2)) print("Counters started, awaiting completion") - await sleep(2) - print("Slept 1 second, killing countdown") - await cdown.cancel() + await giambio.sleep(2) + print("Slept 2 seconds, killing countup") + await cup.cancel() ## DOES NOT WORK!!! await cup.join() await cdown.join() print("Task execution complete") if __name__ == "__main__": - scheduler = AsyncScheduler() + scheduler = giambio.AsyncScheduler() scheduler.start(main())