mirror of https://github.com/nocturn9x/giambio.git
Moved the queueing mechanism to use events instead of event loop hacks
This commit is contained in:
parent
51d01ba44e
commit
594c69ed84
|
@ -34,8 +34,9 @@ class Event:
|
||||||
|
|
||||||
self.set = False
|
self.set = False
|
||||||
self.waiters = set()
|
self.waiters = set()
|
||||||
|
self.value = None
|
||||||
|
|
||||||
async def trigger(self):
|
async def trigger(self, value: Optional[Any] = None):
|
||||||
"""
|
"""
|
||||||
Sets the event, waking up all tasks that called
|
Sets the event, waking up all tasks that called
|
||||||
pause() on it
|
pause() on it
|
||||||
|
@ -43,6 +44,7 @@ class Event:
|
||||||
|
|
||||||
if self.set:
|
if self.set:
|
||||||
raise GiambioError("The event has already been set")
|
raise GiambioError("The event has already been set")
|
||||||
|
self.value = value
|
||||||
await event_set(self)
|
await event_set(self)
|
||||||
|
|
||||||
async def wait(self):
|
async def wait(self):
|
||||||
|
@ -50,15 +52,15 @@ class Event:
|
||||||
Waits until the event is set
|
Waits until the event is set
|
||||||
"""
|
"""
|
||||||
|
|
||||||
await event_wait(self)
|
return await event_wait(self)
|
||||||
|
|
||||||
|
|
||||||
class Queue:
|
class Queue:
|
||||||
"""
|
"""
|
||||||
An asynchronous FIFO queue similar to asyncio.Queue
|
An asynchronous FIFO queue similar to asyncio.Queue
|
||||||
that uses a collections.deque object for the underlying
|
that uses a collections.deque object for the underlying
|
||||||
data representation. This queue is *NOT* thread-safe
|
data representation. This queue is *NOT* thread-safe as
|
||||||
|
it is based on giambio's Event mechanism
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, maxsize: Optional[int] = None):
|
def __init__(self, maxsize: Optional[int] = None):
|
||||||
|
@ -80,23 +82,12 @@ class Queue:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.maxsize or len(self.container) < self.maxsize:
|
if not self.maxsize or len(self.container) < self.maxsize:
|
||||||
|
self.container.append(item)
|
||||||
if self.getters:
|
if self.getters:
|
||||||
task = self.getters.popleft()
|
await self.getters.popleft().trigger(self.container.popleft())
|
||||||
loop = await current_loop()
|
|
||||||
loop._data[task] = item
|
|
||||||
await schedule_tasks([task])
|
|
||||||
else:
|
|
||||||
# We pop in an else block instead of
|
|
||||||
# always doing it because
|
|
||||||
# by setting loop._data[task]
|
|
||||||
# to the item we've already
|
|
||||||
# kind of returned it and if
|
|
||||||
# we also appended it to the deque
|
|
||||||
# it would be as if it was added twice
|
|
||||||
self.container.append(item)
|
|
||||||
else:
|
else:
|
||||||
self.putters.append(await current_task())
|
self.putters.append(Event())
|
||||||
await suspend()
|
await self.putters[-1].wait()
|
||||||
|
|
||||||
|
|
||||||
async def get(self) -> Any:
|
async def get(self) -> Any:
|
||||||
|
@ -108,8 +99,8 @@ class Queue:
|
||||||
|
|
||||||
if self.container:
|
if self.container:
|
||||||
if self.putters:
|
if self.putters:
|
||||||
await schedule_tasks([self.putters.popleft()])
|
await self.putters.popleft().trigger()
|
||||||
return self.container.popleft()
|
return self.container.popleft()
|
||||||
else:
|
else:
|
||||||
self.getters.append(await current_task())
|
self.getters.append(Event())
|
||||||
return await suspend()
|
return await self.getters[-1].wait()
|
|
@ -195,7 +195,7 @@ async def event_wait(event):
|
||||||
return
|
return
|
||||||
task = await current_task()
|
task = await current_task()
|
||||||
event.waiters.add(task)
|
event.waiters.add(task)
|
||||||
await create_trap("suspend")
|
return await create_trap("suspend")
|
||||||
|
|
||||||
|
|
||||||
async def event_set(event):
|
async def event_set(event):
|
||||||
|
@ -205,4 +205,7 @@ async def event_set(event):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
event.set = True
|
event.set = True
|
||||||
|
loop = await current_loop()
|
||||||
|
for waiter in event.waiters:
|
||||||
|
loop._data[waiter] = event.value
|
||||||
await schedule_tasks(event.waiters)
|
await schedule_tasks(event.waiters)
|
||||||
|
|
|
@ -8,9 +8,9 @@ import giambio
|
||||||
async def child(ev: giambio.Event, pause: int):
|
async def child(ev: giambio.Event, pause: int):
|
||||||
print("[child] Child is alive! Going to wait until notified")
|
print("[child] Child is alive! Going to wait until notified")
|
||||||
start_total = giambio.clock()
|
start_total = giambio.clock()
|
||||||
await ev.wait()
|
data = await ev.wait()
|
||||||
end_pause = giambio.clock() - start_total
|
end_pause = giambio.clock() - start_total
|
||||||
print(f"[child] Parent set the event, exiting in {pause} seconds")
|
print(f"[child] Parent set the event with value {data}, exiting in {pause} seconds")
|
||||||
start_sleep = giambio.clock()
|
start_sleep = giambio.clock()
|
||||||
await giambio.sleep(pause)
|
await giambio.sleep(pause)
|
||||||
end_sleep = giambio.clock() - start_sleep
|
end_sleep = giambio.clock() - start_sleep
|
||||||
|
@ -26,7 +26,7 @@ async def parent(pause: int = 1):
|
||||||
start = giambio.clock()
|
start = giambio.clock()
|
||||||
print(f"[parent] Sleeping {pause} second(s) before setting the event")
|
print(f"[parent] Sleeping {pause} second(s) before setting the event")
|
||||||
await giambio.sleep(pause)
|
await giambio.sleep(pause)
|
||||||
await event.trigger()
|
await event.trigger(True)
|
||||||
print("[parent] Event set, awaiting child completion")
|
print("[parent] Event set, awaiting child completion")
|
||||||
end = giambio.clock() - start
|
end = giambio.clock() - start
|
||||||
print(f"[parent] Child exited in {end:.2f} seconds")
|
print(f"[parent] Child exited in {end:.2f} seconds")
|
||||||
|
|
|
@ -6,7 +6,6 @@ async def producer(q: giambio.Queue, n: int):
|
||||||
for i in range(n):
|
for i in range(n):
|
||||||
await q.put(i)
|
await q.put(i)
|
||||||
print(f"Produced {i}")
|
print(f"Produced {i}")
|
||||||
await giambio.sleep(1)
|
|
||||||
await q.put(None)
|
await q.put(None)
|
||||||
print("Producer done")
|
print("Producer done")
|
||||||
|
|
||||||
|
@ -18,6 +17,7 @@ async def consumer(q: giambio.Queue):
|
||||||
print("Consumer done")
|
print("Consumer done")
|
||||||
break
|
break
|
||||||
print(f"Consumed {item}")
|
print(f"Consumed {item}")
|
||||||
|
await giambio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue