From 66d7c5126860203e7e876a26af86318791b1b78f Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sat, 14 May 2022 11:19:55 +0200 Subject: [PATCH] Locks stuff + fixes + bugs --- giambio/context.py | 2 +- giambio/core.py | 34 ++++++++++++++++---------------- giambio/internal.py | 2 +- giambio/runtime.py | 18 ++++++----------- tests/cancel.py | 5 +++-- tests/lock.py | 41 +++++++++++++++++++++++++++++++++++++++ tests/nested_exception.py | 2 +- tests/nested_pool.py | 1 + tests/proxy.py | 2 ++ tests/queue.py | 1 + tests/task_ipc.py | 2 +- tests/task_ipc2.py | 1 + tests/timeout3.py | 32 ++++++++++++++++++++++++++++++ 13 files changed, 108 insertions(+), 35 deletions(-) create mode 100644 tests/lock.py create mode 100644 tests/timeout3.py diff --git a/giambio/context.py b/giambio/context.py index 9fe7cfa..ad7d01e 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -55,7 +55,7 @@ class TaskManager: self._proper_init = False self.enclosed_pool: Optional["giambio.context.TaskManager"] = None self.raise_on_timeout: bool = raise_on_timeout - self.entry_point: Optional[Task] = None + self.entry_point: Optional[giambio.Task] = None async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task": """ diff --git a/giambio/core.py b/giambio/core.py index 73270ec..e6f68d9 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -17,6 +17,7 @@ limitations under the License. """ # Import libraries and internal resources +from lib2to3.pytree import Base import types from giambio.task import Task from collections import deque @@ -298,7 +299,10 @@ class AsyncScheduler: # We need to make sure we don't try to execute # exited tasks that are on the running queue return - if not self.current_pool: + if self.current_pool: + if self.current_task.pool and self.current_task.pool is not self.current_pool: + self.current_task.pool.enclosed_pool = self.current_pool + else: self.current_pool = self.current_task.pool pool = self.current_pool while pool: @@ -462,7 +466,8 @@ class AsyncScheduler: for task in tasks: self.paused.discard(task) - self.suspended.remove(task) + if task in self.suspended: + self.suspended.remove(task) self.run_ready.extend(tasks) self.reschedule_running() @@ -493,7 +498,7 @@ class AsyncScheduler: :return: The closest deadline according to our clock :rtype: float """ - + if not self.deadlines: # If there are no deadlines just wait until the first task wakeup timeout = max(0.0, self.paused.get_closest_deadline() - self.clock()) @@ -616,8 +621,9 @@ class AsyncScheduler: If ensure_done equals False, the loop will cancel ALL running and scheduled tasks and then tear itself down. If ensure_done equals True, which is the default behavior, - this method will raise a GiambioError if the loop hasn't - finished running. + this method will raise a GiambioError exception if the loop + hasn't finished running. The state of the event loop is reset + so it can be reused with another run() call """ if ensure_done: @@ -644,18 +650,11 @@ class AsyncScheduler: if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done(): return - for t in task.joiners: - if t not in self.run_ready: - # Since a task can be the parent - # of multiple children, we need to - # make sure we reschedule it only - # once, otherwise a RuntimeError will - # occur - self.run_ready.append(t) + self.run_ready.extend(task.joiners) def join(self, task: Task): """ - Joins a task to its callers (implicitly, the parent + Joins a task to its callers (implicitly the parent task, but also every other task who called await task.join() on the task object) """ @@ -668,8 +667,9 @@ class AsyncScheduler: self.io_release_task(task) if task in self.suspended: self.suspended.remove(task) - # If the pool has finished executing or we're at the first parent - # task that kicked the loop, we can safely reschedule the parent(s) + # If the pool (including any enclosing pools) has finished executing + # or we're at the first task that kicked the loop, we can safely + # reschedule the parent(s) if task.pool is None: return if task.pool.done(): @@ -680,7 +680,7 @@ class AsyncScheduler: task.status = "crashed" if task.exc.__traceback__: # TODO: We might want to do a bit more complex traceback hacking to remove any extra - # frames from the exception call stack, but for now removing at least the first one + # frames from the exception call stack, but for now removing at least the first few # seems a sensible approach (it's us catching it so we don't care about that) for _ in range(5): if task.exc.__traceback__.tb_next: diff --git a/giambio/internal.py b/giambio/internal.py index 0449e03..593f065 100644 --- a/giambio/internal.py +++ b/giambio/internal.py @@ -273,7 +273,7 @@ class DeadlinesQueue: :param pool: The pool object to store """ - if pool and pool not in self.pools and not pool.done() and not pool.timed_out and pool.timeout: + if pool and pool not in self.pools and not pool.timed_out and pool.timeout: self.pools.add(pool) heappush(self.container, (pool.timeout, self.sequence, pool)) self.sequence += 1 diff --git a/giambio/runtime.py b/giambio/runtime.py index b9217ad..8029c71 100644 --- a/giambio/runtime.py +++ b/giambio/runtime.py @@ -95,19 +95,16 @@ def create_pool(): return TaskManager() - def with_timeout(timeout: int or float): """ Creates an async pool with an associated timeout """ assert timeout > 0, "The timeout must be greater than 0" + mgr = TaskManager(timeout) loop = get_event_loop() - mgr = TaskManager(timeout) - if loop.current_task is not loop.entry_point: - mgr.tasks.append(loop.current_task) - if loop.current_pool and loop.current_pool is not mgr: - loop.current_pool.enclosed_pool = mgr + if loop.current_task is loop.entry_point: + loop.current_pool = mgr return mgr @@ -119,11 +116,8 @@ def skip_after(timeout: int or float): """ assert timeout > 0, "The timeout must be greater than 0" + mgr = TaskManager(timeout, False) loop = get_event_loop() - mgr = TaskManager(timeout, False) - if loop.current_task is not loop.entry_point: - mgr.tasks.append(loop.current_task) - if loop.current_pool and loop.current_pool is not mgr: - loop.current_pool.enclosed_pool = mgr + if loop.current_task is loop.entry_point: + loop.current_pool = mgr return mgr - diff --git a/tests/cancel.py b/tests/cancel.py index b623b7a..c75a51a 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -11,10 +11,11 @@ async def child(name: int): async def main(): start = giambio.clock() async with giambio.create_pool() as pool: - await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! + # await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! task = await pool.spawn(child, 2) - await task.cancel() print("[main] Children spawned, awaiting completion") + await task.cancel() + print("[main] Second child cancelled") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") diff --git a/tests/lock.py b/tests/lock.py new file mode 100644 index 0000000..62530e8 --- /dev/null +++ b/tests/lock.py @@ -0,0 +1,41 @@ +from debugger import Debugger +import giambio + + +async def child(start: int, limit: int, q: giambio.Queue, l: giambio.Lock): + async with l: # If you comment this line, the resulting queue will + # be all mixed up! + # Locked! If any other task + # tries to acquire the lock, + # they'll wait for us to finish + while start <= limit: + print(f"[locked ({limit})] {start}") + await q.put(start) + start += 1 + await giambio.sleep(1) + + +async def other(stop: int): + # Other tasks are unaffected + # by the lock if they don't + # acquire it + n = stop + while n: + print(f"[unlocked ({stop})] {n}") + n -= 1 + await giambio.sleep(1) + + +async def main(): + queue = giambio.Queue() + lock = giambio.Lock() + async with giambio.create_pool() as pool: + await pool.spawn(child, 1, 5, queue, lock) + await pool.spawn(child, 6, 10, queue, lock) + await pool.spawn(other, 10) + await pool.spawn(other, 5) + print(f"Result: {queue}") # Queue is ordered! + print("[main] Done") + + +giambio.run(main, debugger=()) \ No newline at end of file diff --git a/tests/nested_exception.py b/tests/nested_exception.py index 4f1f367..165ac1e 100644 --- a/tests/nested_exception.py +++ b/tests/nested_exception.py @@ -36,7 +36,7 @@ async def main(): print("[main] First 2 children spawned, awaiting completion") async with giambio.create_pool() as new_pool: # This pool will be cancelled by the exception - # in the other pool + # in the outer pool await new_pool.spawn(child2) await new_pool.spawn(child3) print("[main] Third and fourth children spawned") diff --git a/tests/nested_pool.py b/tests/nested_pool.py index 428e980..32637fd 100644 --- a/tests/nested_pool.py +++ b/tests/nested_pool.py @@ -1,3 +1,4 @@ +## Example for nested task pools import giambio from debugger import Debugger diff --git a/tests/proxy.py b/tests/proxy.py index 8d5ff8d..682bde1 100644 --- a/tests/proxy.py +++ b/tests/proxy.py @@ -1,3 +1,5 @@ +## Two-way proxy example stolen from njsmith's talk about trio + from debugger import Debugger import giambio import socket diff --git a/tests/queue.py b/tests/queue.py index f1476dc..2e70124 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -1,3 +1,4 @@ +## Producer-consumer code using giambio's async queue import giambio from debugger import Debugger diff --git a/tests/task_ipc.py b/tests/task_ipc.py index 64383fa..4249624 100644 --- a/tests/task_ipc.py +++ b/tests/task_ipc.py @@ -1,4 +1,4 @@ -## SImple task IPC using giambio's MemoryChannel class +## Simple task IPC using giambio's MemoryChannel class import random import string import giambio diff --git a/tests/task_ipc2.py b/tests/task_ipc2.py index 673d227..7c6f30c 100644 --- a/tests/task_ipc2.py +++ b/tests/task_ipc2.py @@ -1,3 +1,4 @@ +## Simple task IPC using giambio's NetworkChannel class import random import string import giambio diff --git a/tests/timeout3.py b/tests/timeout3.py new file mode 100644 index 0000000..031fea9 --- /dev/null +++ b/tests/timeout3.py @@ -0,0 +1,32 @@ +import giambio +from debugger import Debugger + + +async def child(name: int): + print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") + await giambio.sleep(name) + print(f"[child {name}] Had a nice nap!") + + +async def worker(coro, *args): + try: + async with giambio.with_timeout(10): + await coro(*args) + except giambio.exceptions.TooSlowError: + return True + return False + + +async def main(): + start = giambio.clock() + async with giambio.skip_after(10) as pool: + t = await pool.spawn(worker, child, 7) + await giambio.sleep(2) + t2 = await pool.spawn(worker, child, 15) + if any([await t.join(), await t2.join()]): + print("[main] One or more children have timed out!") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=()) \ No newline at end of file