diff --git a/giambio/core.py b/giambio/core.py index de77088..237c76b 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -126,7 +126,10 @@ class AsyncScheduler: self.io_skip_limit = io_skip_limit or 5 # The max. I/O timeout self.io_max_timeout = io_max_timeout or 86400 + # The loop's entry point self.entry_point: Optional[Task] = None + # Suspended tasks + self.suspended: deque = deque() def __repr__(self): """ @@ -159,7 +162,7 @@ class AsyncScheduler: Returns True if there is no work to do """ - return not any([self.paused, self.run_ready, self.selector.get_map()]) + return not any([self.paused, self.run_ready, self.selector.get_map(), self.suspended]) def shutdown(self): """ @@ -355,6 +358,7 @@ class AsyncScheduler: if self.current_task.last_io: self.io_release_task(self.current_task) + self.suspended.append(self.current_task) def reschedule_running(self): """ @@ -450,6 +454,7 @@ class AsyncScheduler: for task in tasks: self.paused.discard(task) + self.suspended.remove(task) self.run_ready.extend(tasks) self.reschedule_running() diff --git a/giambio/sync.py b/giambio/sync.py index 2bb8d2e..eea10e4 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -86,10 +86,16 @@ class Queue: loop._data[task] = item await schedule_tasks([task]) else: + # We pop in an else block instead of + # always doing it because + # by setting loop._data[task] + # to the item we've already + # kind of returned it and if + # we also appended it to the deque + # it would be as if it was added twice self.container.append(item) else: self.putters.append(await current_task()) - print(self.putters) await suspend() diff --git a/giambio/traps.py b/giambio/traps.py index fc81665..0c0a979 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -175,6 +175,15 @@ async def want_write(stream): await create_trap("register_sock", stream, "write") +async def schedule_tasks(tasks: Iterable[Task]): + """ + Schedules a list of tasks for execution. Usuaully + used to unsuspend them after they called suspend() + """ + + await create_trap("schedule_tasks", tasks) + + async def event_wait(event): """ Notifies the event loop that the current task has to wait @@ -186,7 +195,7 @@ async def event_wait(event): return task = await current_task() event.waiters.add(task) - await create_trap("suspend", task) + await create_trap("suspend") async def event_set(event): @@ -197,11 +206,3 @@ async def event_set(event): event.set = True await schedule_tasks(event.waiters) - - -async def schedule_tasks(tasks: Iterable[Task]): - """ - Schedules a list of tasks for execution - """ - - await create_trap("schedule_tasks", tasks) diff --git a/tests/chatroom.py b/tests/chatroom.py index 5aec80a..1a18670 100644 --- a/tests/chatroom.py +++ b/tests/chatroom.py @@ -60,7 +60,7 @@ async def handler(sock: AsyncSocket, client_address: tuple): logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}") if client_sock != sock: await client_sock.send_all(data) - logging.info(f"Echoed back {data!r} to {i} clients") + logging.info(f"Sent {data!r} to {i} clients") logging.info(f"Connection from {address} closed") diff --git a/tests/queue.py b/tests/queue.py index 627ffc9..da24669 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -6,6 +6,7 @@ async def producer(q: giambio.Queue, n: int): for i in range(n): await q.put(i) print(f"Produced {i}") + await giambio.sleep(1) await q.put(None) print("Producer done") @@ -17,13 +18,13 @@ async def consumer(q: giambio.Queue): print("Consumer done") break print(f"Consumed {item}") - await giambio.sleep(3) + async def main(q: giambio.Queue, n: int): async with giambio.create_pool() as pool: - await pool.spawn(producer, q, n) await pool.spawn(consumer, q) + await pool.spawn(producer, q, n)