From ad34be87547aef657361b2b932b653e19ed4130d Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Tue, 10 May 2022 11:56:47 +0200 Subject: [PATCH] Added locks --- giambio/__init__.py | 3 +- giambio/core.py | 28 +++++++++++++++---- giambio/runtime.py | 24 ++++++++-------- giambio/sync.py | 67 +++++++++++++++++++++++++++++++++++++++++++-- giambio/traps.py | 2 +- tests/queue.py | 12 ++++++-- tests/task_ipc.py | 1 + tests/timeout2.py | 8 +++--- 8 files changed, 116 insertions(+), 29 deletions(-) diff --git a/giambio/__init__.py b/giambio/__init__.py index 520db85..71989a5 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel +from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel, Lock from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -37,6 +37,7 @@ __all__ = [ "Channel", "NetworkChannel", "MemoryChannel", + "Lock", "run", "clock", "create_pool", diff --git a/giambio/core.py b/giambio/core.py index 48fb734..73270ec 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -130,6 +130,7 @@ class AsyncScheduler: self.entry_point: Optional[Task] = None # Suspended tasks self.suspended: deque = deque() + def __repr__(self): """ @@ -209,7 +210,10 @@ class AsyncScheduler: # after it is set, but it makes the implementation easier if not self.current_pool and self.current_task.pool: self.current_pool = self.current_task.pool - self.deadlines.put(self.current_pool) + pool = self.current_pool + while pool: + self.deadlines.put(pool) + pool = self.current_pool.enclosed_pool # If there are no actively running tasks, we start by # checking for I/O. This method will wait for I/O until # the closest deadline to avoid starving sleeping tasks @@ -288,16 +292,18 @@ class AsyncScheduler: account, that's self.run's job! """ - data = None # Sets the currently running task self.current_task = self.run_ready.popleft() if self.current_task.done(): # We need to make sure we don't try to execute # exited tasks that are on the running queue return - if not self.current_pool and self.current_task.pool: + if not self.current_pool: self.current_pool = self.current_task.pool - self.deadlines.put(self.current_pool) + pool = self.current_pool + while pool: + self.deadlines.put(pool) + pool = self.current_pool.enclosed_pool self.debugger.before_task_step(self.current_task) # Some debugging and internal chatter here self.current_task.status = "run" @@ -363,7 +369,6 @@ class AsyncScheduler: self.current_task.status = "sleep" self.suspended.append(self.current_task) - def reschedule_running(self): """ Reschedules the currently running task @@ -444,8 +449,8 @@ class AsyncScheduler: self.cancel_pool(pool) for task in pool.tasks: self.join(task) - self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) if pool.entry_point is self.entry_point: + self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) self.run_ready.append(self.entry_point) def schedule_tasks(self, tasks: List[Task]): @@ -480,6 +485,7 @@ class AsyncScheduler: self.run_ready.append(task) self.debugger.after_sleep(task, slept) + def get_closest_deadline(self) -> float: """ Gets the closest expiration deadline (asleep tasks, timeouts) @@ -619,6 +625,16 @@ class AsyncScheduler: elif not self.done(): raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() + # We reset the event loop's state + self.tasks = [] + self.entry_point = None + self.current_pool = None + self.current_task = None + self.paused = TimeQueue(self.clock) + self.deadlines = DeadlinesQueue() + self.run_ready = deque() + self.suspended = deque() + def reschedule_joiners(self, task: Task): """ diff --git a/giambio/runtime.py b/giambio/runtime.py index bf14e28..b9217ad 100644 --- a/giambio/runtime.py +++ b/giambio/runtime.py @@ -95,19 +95,19 @@ def create_pool(): return TaskManager() + def with_timeout(timeout: int or float): """ Creates an async pool with an associated timeout """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout) loop = get_event_loop() - if loop.current_task.pool is None: - loop.current_pool = mgr - loop.current_task.pool = mgr - loop.current_task.next_deadline = mgr.timeout or 0.0 - loop.deadlines.put(mgr) + mgr = TaskManager(timeout) + if loop.current_task is not loop.entry_point: + mgr.tasks.append(loop.current_task) + if loop.current_pool and loop.current_pool is not mgr: + loop.current_pool.enclosed_pool = mgr return mgr @@ -119,11 +119,11 @@ def skip_after(timeout: int or float): """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout, False) loop = get_event_loop() - if loop.current_task.pool is None: - loop.current_pool = mgr - loop.current_task.pool = mgr - loop.current_task.next_deadline = mgr.timeout or 0.0 - loop.deadlines.put(mgr) + mgr = TaskManager(timeout, False) + if loop.current_task is not loop.entry_point: + mgr.tasks.append(loop.current_task) + if loop.current_pool and loop.current_pool is not mgr: + loop.current_pool.enclosed_pool = mgr return mgr + diff --git a/giambio/sync.py b/giambio/sync.py index 74a0163..ee1f600 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -19,9 +19,10 @@ from socket import socketpair from abc import ABC, abstractmethod from collections import deque from typing import Any, Optional -from giambio.traps import event_wait, event_set +from giambio.traps import event_wait, event_set, current_task from giambio.exceptions import GiambioError from giambio.socket import wrap_socket +from giambio.task import Task class Event: @@ -72,7 +73,11 @@ class Queue: """ self.maxsize = maxsize + # Stores event objects for tasks wanting to + # get items from the queue self.getters = deque() + # Stores event objects for tasks wanting to + # put items on the queue self.putters = deque() self.container = deque() @@ -84,16 +89,19 @@ class Queue: return len(self.container) + def __repr__(self) -> str: + return f"{type(self).__name__}({f', '.join(map(str, self.container))})" + async def __aiter__(self): """ - Implements the iterator protocol + Implements the asynchronous iterator protocol """ return self async def __anext__(self): """ - Implements the iterator protocol + Implements the asynchronous iterator protocol """ return await self.get() @@ -325,3 +333,56 @@ class NetworkChannel(Channel): except BlockingIOError: return False return True + + +class Lock: + """ + A simple single-owner lock + """ + + def __init__(self): + """ + Public constructor + """ + + self.owner: Optional[Task] = None + self.tasks: deque[Event] = deque() + + async def acquire(self): + """ + Acquires the lock + """ + + task = await current_task() + if self.owner is None: + self.owner = task + elif task is self.owner: + raise RuntimeError("lock is already acquired by current task") + elif self.owner is not task: + self.tasks.append(Event()) + await self.tasks[-1].wait() + self.owner = task + + async def release(self): + """ + Releases the lock + """ + + task = await current_task() + if self.owner is None: + raise RuntimeError("lock is not acquired") + elif self.owner is not task: + raise RuntimeError("lock can only released by its owner") + elif self.tasks: + await self.tasks.popleft().trigger() + else: + self.owner = None + + + async def __aenter__(self): + await self.acquire() + return self + + + async def __aexit__(self, *args): + await self.release() diff --git a/giambio/traps.py b/giambio/traps.py index 2e1e8f6..f7dd8bf 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -209,4 +209,4 @@ async def event_set(event, value): loop = await current_loop() for waiter in event.waiters: loop._data[waiter] = event.value - await schedule_tasks(event.waiters) + await schedule_tasks(event.waiters) \ No newline at end of file diff --git a/tests/queue.py b/tests/queue.py index afe761a..f1476dc 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -4,6 +4,9 @@ from debugger import Debugger async def producer(q: giambio.Queue, n: int): for i in range(n): + # This will wait until the + # queue is emptied by the + # consumer await q.put(i) print(f"Produced {i}") await q.put(None) @@ -12,11 +15,16 @@ async def producer(q: giambio.Queue, n: int): async def consumer(q: giambio.Queue): while True: + # Hangs until there is + # something on the queue item = await q.get() if item is None: print("Consumer done") break print(f"Consumed {item}") + # Simulates some work so the + # producer waits before putting + # the next value await giambio.sleep(1) @@ -24,8 +32,8 @@ async def main(q: giambio.Queue, n: int): async with giambio.create_pool() as pool: await pool.spawn(producer, q, n) await pool.spawn(consumer, q) - + print("Bye!") -queue = giambio.Queue() +queue = giambio.Queue(1) # Queue has size limit of 1 giambio.run(main, queue, 5, debugger=()) diff --git a/tests/task_ipc.py b/tests/task_ipc.py index 87faf48..64383fa 100644 --- a/tests/task_ipc.py +++ b/tests/task_ipc.py @@ -1,3 +1,4 @@ +## SImple task IPC using giambio's MemoryChannel class import random import string import giambio diff --git a/tests/timeout2.py b/tests/timeout2.py index 42d75fe..b26f2bc 100644 --- a/tests/timeout2.py +++ b/tests/timeout2.py @@ -11,11 +11,11 @@ async def child(name: int): async def main(): start = giambio.clock() async with giambio.skip_after(10) as pool: - await pool.spawn(child, 7) # This will complete - await giambio.sleep(2) # This will make the code below wait 2 seconds + await pool.spawn(child, 7) # This will complete + await giambio.sleep(2) # This will make the code below wait 2 seconds await pool.spawn(child, 15) # This will not complete - await giambio.sleep(50) - await child(20) # Neither will this + await giambio.sleep(50) # Neither will this + await child(20) # Nor this if pool.timed_out: print("[main] One or more children have timed out!") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")