Hopefully fixed giambio.Queue

This commit is contained in:
Nocturn9x 2022-02-26 21:59:18 +01:00
parent da9a579caf
commit 3eb6844848
4 changed files with 17 additions and 13 deletions

View File

@ -95,7 +95,7 @@ class AsyncScheduler:
or type( or type(
"DumbDebugger", "DumbDebugger",
(object,), (object,),
{"__getattr__": lambda *args: lambda *arg: None}, {"__getattr__": lambda *_: lambda *_: None},
)() )()
) )
# All tasks the loop has # All tasks the loop has
@ -151,6 +151,8 @@ class AsyncScheduler:
"_data", "_data",
"io_skip_limit", "io_skip_limit",
"io_max_timeout", "io_max_timeout",
"suspended",
"entry_point"
} }
data = ", ".join( data = ", ".join(
name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields))
@ -360,6 +362,7 @@ class AsyncScheduler:
self.io_release_task(self.current_task) self.io_release_task(self.current_task)
self.suspended.append(self.current_task) self.suspended.append(self.current_task)
def reschedule_running(self): def reschedule_running(self):
""" """
Reschedules the currently running task Reschedules the currently running task

View File

@ -17,7 +17,7 @@ limitations under the License.
""" """
from collections import deque from collections import deque
from typing import Any, Optional from typing import Any, Optional
from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop from giambio.traps import event_wait, event_set
from giambio.exceptions import GiambioError from giambio.exceptions import GiambioError
@ -71,7 +71,7 @@ class Queue:
self.maxsize = maxsize self.maxsize = maxsize
self.getters = deque() self.getters = deque()
self.putters = deque() self.putters = deque()
self.container = deque(maxlen=maxsize) self.container = deque()
async def put(self, item: Any): async def put(self, item: Any):
@ -81,14 +81,15 @@ class Queue:
enough space for the queue enough space for the queue
""" """
if not self.maxsize or len(self.container) < self.maxsize: if self.maxsize and len(self.container) < self.maxsize:
self.container.append(item) self.container.append(item)
if self.getters: if self.getters:
await self.getters.popleft().trigger(self.container.popleft()) await self.getters.popleft().trigger(self.container.popleft())
else: else:
self.putters.append(Event()) ev = Event()
await self.putters[-1].wait() self.putters.append(ev)
await ev.wait()
self.container.append(item)
async def get(self) -> Any: async def get(self) -> Any:
""" """
@ -102,5 +103,6 @@ class Queue:
await self.putters.popleft().trigger() await self.putters.popleft().trigger()
return self.container.popleft() return self.container.popleft()
else: else:
self.getters.append(Event()) ev = Event()
return await self.getters[-1].wait() self.getters.append(ev)
return await ev.wait()

View File

@ -195,7 +195,7 @@ async def event_wait(event):
return return
task = await current_task() task = await current_task()
event.waiters.add(task) event.waiters.add(task)
return await create_trap("suspend") return await suspend()
async def event_set(event): async def event_set(event):

View File

@ -18,15 +18,14 @@ async def consumer(q: giambio.Queue):
break break
print(f"Consumed {item}") print(f"Consumed {item}")
await giambio.sleep(1) await giambio.sleep(1)
async def main(q: giambio.Queue, n: int): async def main(q: giambio.Queue, n: int):
async with giambio.create_pool() as pool: async with giambio.create_pool() as pool:
await pool.spawn(consumer, q)
await pool.spawn(producer, q, n) await pool.spawn(producer, q, n)
await pool.spawn(consumer, q)
queue = giambio.Queue() queue = giambio.Queue(2)
giambio.run(main, queue, 5, debugger=()) giambio.run(main, queue, 5, debugger=())