From 3eb6844848fb1b0f3112f825426661e4afa5581a Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sat, 26 Feb 2022 21:59:18 +0100 Subject: [PATCH] Hopefully fixed giambio.Queue --- giambio/core.py | 5 ++++- giambio/sync.py | 18 ++++++++++-------- giambio/traps.py | 2 +- tests/queue.py | 5 ++--- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/giambio/core.py b/giambio/core.py index 1715271..ee48c00 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -95,7 +95,7 @@ class AsyncScheduler: or type( "DumbDebugger", (object,), - {"__getattr__": lambda *args: lambda *arg: None}, + {"__getattr__": lambda *_: lambda *_: None}, )() ) # All tasks the loop has @@ -151,6 +151,8 @@ class AsyncScheduler: "_data", "io_skip_limit", "io_max_timeout", + "suspended", + "entry_point" } data = ", ".join( name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) @@ -360,6 +362,7 @@ class AsyncScheduler: self.io_release_task(self.current_task) self.suspended.append(self.current_task) + def reschedule_running(self): """ Reschedules the currently running task diff --git a/giambio/sync.py b/giambio/sync.py index f6babab..9da7a3a 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -17,7 +17,7 @@ limitations under the License. """ from collections import deque from typing import Any, Optional -from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop +from giambio.traps import event_wait, event_set from giambio.exceptions import GiambioError @@ -71,7 +71,7 @@ class Queue: self.maxsize = maxsize self.getters = deque() self.putters = deque() - self.container = deque(maxlen=maxsize) + self.container = deque() async def put(self, item: Any): @@ -81,14 +81,15 @@ class Queue: enough space for the queue """ - if not self.maxsize or len(self.container) < self.maxsize: + if self.maxsize and len(self.container) < self.maxsize: self.container.append(item) if self.getters: await self.getters.popleft().trigger(self.container.popleft()) else: - self.putters.append(Event()) - await self.putters[-1].wait() - + ev = Event() + self.putters.append(ev) + await ev.wait() + self.container.append(item) async def get(self) -> Any: """ @@ -102,5 +103,6 @@ class Queue: await self.putters.popleft().trigger() return self.container.popleft() else: - self.getters.append(Event()) - return await self.getters[-1].wait() \ No newline at end of file + ev = Event() + self.getters.append(ev) + return await ev.wait() diff --git a/giambio/traps.py b/giambio/traps.py index 240917c..b15a838 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -195,7 +195,7 @@ async def event_wait(event): return task = await current_task() event.waiters.add(task) - return await create_trap("suspend") + return await suspend() async def event_set(event): diff --git a/tests/queue.py b/tests/queue.py index d8dc4bb..7787bb1 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -18,15 +18,14 @@ async def consumer(q: giambio.Queue): break print(f"Consumed {item}") await giambio.sleep(1) - async def main(q: giambio.Queue, n: int): async with giambio.create_pool() as pool: - await pool.spawn(consumer, q) await pool.spawn(producer, q, n) + await pool.spawn(consumer, q) -queue = giambio.Queue() +queue = giambio.Queue(2) giambio.run(main, queue, 5, debugger=())