diff --git a/giambio/sync.py b/giambio/sync.py index eea10e4..f6babab 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -34,8 +34,9 @@ class Event: self.set = False self.waiters = set() + self.value = None - async def trigger(self): + async def trigger(self, value: Optional[Any] = None): """ Sets the event, waking up all tasks that called pause() on it @@ -43,6 +44,7 @@ class Event: if self.set: raise GiambioError("The event has already been set") + self.value = value await event_set(self) async def wait(self): @@ -50,15 +52,15 @@ class Event: Waits until the event is set """ - await event_wait(self) + return await event_wait(self) class Queue: """ An asynchronous FIFO queue similar to asyncio.Queue that uses a collections.deque object for the underlying - data representation. This queue is *NOT* thread-safe - + data representation. This queue is *NOT* thread-safe as + it is based on giambio's Event mechanism """ def __init__(self, maxsize: Optional[int] = None): @@ -80,23 +82,12 @@ class Queue: """ if not self.maxsize or len(self.container) < self.maxsize: + self.container.append(item) if self.getters: - task = self.getters.popleft() - loop = await current_loop() - loop._data[task] = item - await schedule_tasks([task]) - else: - # We pop in an else block instead of - # always doing it because - # by setting loop._data[task] - # to the item we've already - # kind of returned it and if - # we also appended it to the deque - # it would be as if it was added twice - self.container.append(item) + await self.getters.popleft().trigger(self.container.popleft()) else: - self.putters.append(await current_task()) - await suspend() + self.putters.append(Event()) + await self.putters[-1].wait() async def get(self) -> Any: @@ -108,8 +99,8 @@ class Queue: if self.container: if self.putters: - await schedule_tasks([self.putters.popleft()]) + await self.putters.popleft().trigger() return self.container.popleft() else: - self.getters.append(await current_task()) - return await suspend() \ No newline at end of file + self.getters.append(Event()) + return await self.getters[-1].wait() \ No newline at end of file diff --git a/giambio/traps.py b/giambio/traps.py index 0c0a979..240917c 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -195,7 +195,7 @@ async def event_wait(event): return task = await current_task() event.waiters.add(task) - await create_trap("suspend") + return await create_trap("suspend") async def event_set(event): @@ -205,4 +205,7 @@ async def event_set(event): """ event.set = True + loop = await current_loop() + for waiter in event.waiters: + loop._data[waiter] = event.value await schedule_tasks(event.waiters) diff --git a/tests/events.py b/tests/events.py index b024216..f056b2f 100644 --- a/tests/events.py +++ b/tests/events.py @@ -8,9 +8,9 @@ import giambio async def child(ev: giambio.Event, pause: int): print("[child] Child is alive! Going to wait until notified") start_total = giambio.clock() - await ev.wait() + data = await ev.wait() end_pause = giambio.clock() - start_total - print(f"[child] Parent set the event, exiting in {pause} seconds") + print(f"[child] Parent set the event with value {data}, exiting in {pause} seconds") start_sleep = giambio.clock() await giambio.sleep(pause) end_sleep = giambio.clock() - start_sleep @@ -26,7 +26,7 @@ async def parent(pause: int = 1): start = giambio.clock() print(f"[parent] Sleeping {pause} second(s) before setting the event") await giambio.sleep(pause) - await event.trigger() + await event.trigger(True) print("[parent] Event set, awaiting child completion") end = giambio.clock() - start print(f"[parent] Child exited in {end:.2f} seconds") diff --git a/tests/queue.py b/tests/queue.py index da24669..571774b 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -6,7 +6,6 @@ async def producer(q: giambio.Queue, n: int): for i in range(n): await q.put(i) print(f"Produced {i}") - await giambio.sleep(1) await q.put(None) print("Producer done") @@ -18,6 +17,7 @@ async def consumer(q: giambio.Queue): print("Consumer done") break print(f"Consumed {item}") + await giambio.sleep(1)