mirror of https://github.com/nocturn9x/giambio.git
Minor fixes to chatroom test. Hopefully fixed queue
This commit is contained in:
parent
a0d376bb35
commit
51d01ba44e
|
@ -126,7 +126,10 @@ class AsyncScheduler:
|
||||||
self.io_skip_limit = io_skip_limit or 5
|
self.io_skip_limit = io_skip_limit or 5
|
||||||
# The max. I/O timeout
|
# The max. I/O timeout
|
||||||
self.io_max_timeout = io_max_timeout or 86400
|
self.io_max_timeout = io_max_timeout or 86400
|
||||||
|
# The loop's entry point
|
||||||
self.entry_point: Optional[Task] = None
|
self.entry_point: Optional[Task] = None
|
||||||
|
# Suspended tasks
|
||||||
|
self.suspended: deque = deque()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
"""
|
"""
|
||||||
|
@ -159,7 +162,7 @@ class AsyncScheduler:
|
||||||
Returns True if there is no work to do
|
Returns True if there is no work to do
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return not any([self.paused, self.run_ready, self.selector.get_map()])
|
return not any([self.paused, self.run_ready, self.selector.get_map(), self.suspended])
|
||||||
|
|
||||||
def shutdown(self):
|
def shutdown(self):
|
||||||
"""
|
"""
|
||||||
|
@ -355,6 +358,7 @@ class AsyncScheduler:
|
||||||
|
|
||||||
if self.current_task.last_io:
|
if self.current_task.last_io:
|
||||||
self.io_release_task(self.current_task)
|
self.io_release_task(self.current_task)
|
||||||
|
self.suspended.append(self.current_task)
|
||||||
|
|
||||||
def reschedule_running(self):
|
def reschedule_running(self):
|
||||||
"""
|
"""
|
||||||
|
@ -450,6 +454,7 @@ class AsyncScheduler:
|
||||||
|
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
self.paused.discard(task)
|
self.paused.discard(task)
|
||||||
|
self.suspended.remove(task)
|
||||||
self.run_ready.extend(tasks)
|
self.run_ready.extend(tasks)
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
|
|
|
@ -86,10 +86,16 @@ class Queue:
|
||||||
loop._data[task] = item
|
loop._data[task] = item
|
||||||
await schedule_tasks([task])
|
await schedule_tasks([task])
|
||||||
else:
|
else:
|
||||||
|
# We pop in an else block instead of
|
||||||
|
# always doing it because
|
||||||
|
# by setting loop._data[task]
|
||||||
|
# to the item we've already
|
||||||
|
# kind of returned it and if
|
||||||
|
# we also appended it to the deque
|
||||||
|
# it would be as if it was added twice
|
||||||
self.container.append(item)
|
self.container.append(item)
|
||||||
else:
|
else:
|
||||||
self.putters.append(await current_task())
|
self.putters.append(await current_task())
|
||||||
print(self.putters)
|
|
||||||
await suspend()
|
await suspend()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -175,6 +175,15 @@ async def want_write(stream):
|
||||||
await create_trap("register_sock", stream, "write")
|
await create_trap("register_sock", stream, "write")
|
||||||
|
|
||||||
|
|
||||||
|
async def schedule_tasks(tasks: Iterable[Task]):
|
||||||
|
"""
|
||||||
|
Schedules a list of tasks for execution. Usuaully
|
||||||
|
used to unsuspend them after they called suspend()
|
||||||
|
"""
|
||||||
|
|
||||||
|
await create_trap("schedule_tasks", tasks)
|
||||||
|
|
||||||
|
|
||||||
async def event_wait(event):
|
async def event_wait(event):
|
||||||
"""
|
"""
|
||||||
Notifies the event loop that the current task has to wait
|
Notifies the event loop that the current task has to wait
|
||||||
|
@ -186,7 +195,7 @@ async def event_wait(event):
|
||||||
return
|
return
|
||||||
task = await current_task()
|
task = await current_task()
|
||||||
event.waiters.add(task)
|
event.waiters.add(task)
|
||||||
await create_trap("suspend", task)
|
await create_trap("suspend")
|
||||||
|
|
||||||
|
|
||||||
async def event_set(event):
|
async def event_set(event):
|
||||||
|
@ -197,11 +206,3 @@ async def event_set(event):
|
||||||
|
|
||||||
event.set = True
|
event.set = True
|
||||||
await schedule_tasks(event.waiters)
|
await schedule_tasks(event.waiters)
|
||||||
|
|
||||||
|
|
||||||
async def schedule_tasks(tasks: Iterable[Task]):
|
|
||||||
"""
|
|
||||||
Schedules a list of tasks for execution
|
|
||||||
"""
|
|
||||||
|
|
||||||
await create_trap("schedule_tasks", tasks)
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ async def handler(sock: AsyncSocket, client_address: tuple):
|
||||||
logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}")
|
logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}")
|
||||||
if client_sock != sock:
|
if client_sock != sock:
|
||||||
await client_sock.send_all(data)
|
await client_sock.send_all(data)
|
||||||
logging.info(f"Echoed back {data!r} to {i} clients")
|
logging.info(f"Sent {data!r} to {i} clients")
|
||||||
logging.info(f"Connection from {address} closed")
|
logging.info(f"Connection from {address} closed")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ async def producer(q: giambio.Queue, n: int):
|
||||||
for i in range(n):
|
for i in range(n):
|
||||||
await q.put(i)
|
await q.put(i)
|
||||||
print(f"Produced {i}")
|
print(f"Produced {i}")
|
||||||
|
await giambio.sleep(1)
|
||||||
await q.put(None)
|
await q.put(None)
|
||||||
print("Producer done")
|
print("Producer done")
|
||||||
|
|
||||||
|
@ -17,13 +18,13 @@ async def consumer(q: giambio.Queue):
|
||||||
print("Consumer done")
|
print("Consumer done")
|
||||||
break
|
break
|
||||||
print(f"Consumed {item}")
|
print(f"Consumed {item}")
|
||||||
await giambio.sleep(3)
|
|
||||||
|
|
||||||
|
|
||||||
async def main(q: giambio.Queue, n: int):
|
async def main(q: giambio.Queue, n: int):
|
||||||
async with giambio.create_pool() as pool:
|
async with giambio.create_pool() as pool:
|
||||||
await pool.spawn(producer, q, n)
|
|
||||||
await pool.spawn(consumer, q)
|
await pool.spawn(consumer, q)
|
||||||
|
await pool.spawn(producer, q, n)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue