mirror of https://github.com/nocturn9x/giambio.git
Compare commits
2 Commits
a0d376bb35
...
594c69ed84
Author | SHA1 | Date |
---|---|---|
Nocturn9x | 594c69ed84 | |
Nocturn9x | 51d01ba44e |
|
@ -126,7 +126,10 @@ class AsyncScheduler:
|
|||
self.io_skip_limit = io_skip_limit or 5
|
||||
# The max. I/O timeout
|
||||
self.io_max_timeout = io_max_timeout or 86400
|
||||
# The loop's entry point
|
||||
self.entry_point: Optional[Task] = None
|
||||
# Suspended tasks
|
||||
self.suspended: deque = deque()
|
||||
|
||||
def __repr__(self):
|
||||
"""
|
||||
|
@ -159,7 +162,7 @@ class AsyncScheduler:
|
|||
Returns True if there is no work to do
|
||||
"""
|
||||
|
||||
return not any([self.paused, self.run_ready, self.selector.get_map()])
|
||||
return not any([self.paused, self.run_ready, self.selector.get_map(), self.suspended])
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
|
@ -355,6 +358,7 @@ class AsyncScheduler:
|
|||
|
||||
if self.current_task.last_io:
|
||||
self.io_release_task(self.current_task)
|
||||
self.suspended.append(self.current_task)
|
||||
|
||||
def reschedule_running(self):
|
||||
"""
|
||||
|
@ -450,6 +454,7 @@ class AsyncScheduler:
|
|||
|
||||
for task in tasks:
|
||||
self.paused.discard(task)
|
||||
self.suspended.remove(task)
|
||||
self.run_ready.extend(tasks)
|
||||
self.reschedule_running()
|
||||
|
||||
|
|
|
@ -34,8 +34,9 @@ class Event:
|
|||
|
||||
self.set = False
|
||||
self.waiters = set()
|
||||
self.value = None
|
||||
|
||||
async def trigger(self):
|
||||
async def trigger(self, value: Optional[Any] = None):
|
||||
"""
|
||||
Sets the event, waking up all tasks that called
|
||||
pause() on it
|
||||
|
@ -43,6 +44,7 @@ class Event:
|
|||
|
||||
if self.set:
|
||||
raise GiambioError("The event has already been set")
|
||||
self.value = value
|
||||
await event_set(self)
|
||||
|
||||
async def wait(self):
|
||||
|
@ -50,15 +52,15 @@ class Event:
|
|||
Waits until the event is set
|
||||
"""
|
||||
|
||||
await event_wait(self)
|
||||
return await event_wait(self)
|
||||
|
||||
|
||||
class Queue:
|
||||
"""
|
||||
An asynchronous FIFO queue similar to asyncio.Queue
|
||||
that uses a collections.deque object for the underlying
|
||||
data representation. This queue is *NOT* thread-safe
|
||||
|
||||
data representation. This queue is *NOT* thread-safe as
|
||||
it is based on giambio's Event mechanism
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: Optional[int] = None):
|
||||
|
@ -80,17 +82,12 @@ class Queue:
|
|||
"""
|
||||
|
||||
if not self.maxsize or len(self.container) < self.maxsize:
|
||||
self.container.append(item)
|
||||
if self.getters:
|
||||
task = self.getters.popleft()
|
||||
loop = await current_loop()
|
||||
loop._data[task] = item
|
||||
await schedule_tasks([task])
|
||||
else:
|
||||
self.container.append(item)
|
||||
await self.getters.popleft().trigger(self.container.popleft())
|
||||
else:
|
||||
self.putters.append(await current_task())
|
||||
print(self.putters)
|
||||
await suspend()
|
||||
self.putters.append(Event())
|
||||
await self.putters[-1].wait()
|
||||
|
||||
|
||||
async def get(self) -> Any:
|
||||
|
@ -102,8 +99,8 @@ class Queue:
|
|||
|
||||
if self.container:
|
||||
if self.putters:
|
||||
await schedule_tasks([self.putters.popleft()])
|
||||
await self.putters.popleft().trigger()
|
||||
return self.container.popleft()
|
||||
else:
|
||||
self.getters.append(await current_task())
|
||||
return await suspend()
|
||||
self.getters.append(Event())
|
||||
return await self.getters[-1].wait()
|
|
@ -175,6 +175,15 @@ async def want_write(stream):
|
|||
await create_trap("register_sock", stream, "write")
|
||||
|
||||
|
||||
async def schedule_tasks(tasks: Iterable[Task]):
|
||||
"""
|
||||
Schedules a list of tasks for execution. Usuaully
|
||||
used to unsuspend them after they called suspend()
|
||||
"""
|
||||
|
||||
await create_trap("schedule_tasks", tasks)
|
||||
|
||||
|
||||
async def event_wait(event):
|
||||
"""
|
||||
Notifies the event loop that the current task has to wait
|
||||
|
@ -186,7 +195,7 @@ async def event_wait(event):
|
|||
return
|
||||
task = await current_task()
|
||||
event.waiters.add(task)
|
||||
await create_trap("suspend", task)
|
||||
return await create_trap("suspend")
|
||||
|
||||
|
||||
async def event_set(event):
|
||||
|
@ -196,12 +205,7 @@ async def event_set(event):
|
|||
"""
|
||||
|
||||
event.set = True
|
||||
loop = await current_loop()
|
||||
for waiter in event.waiters:
|
||||
loop._data[waiter] = event.value
|
||||
await schedule_tasks(event.waiters)
|
||||
|
||||
|
||||
async def schedule_tasks(tasks: Iterable[Task]):
|
||||
"""
|
||||
Schedules a list of tasks for execution
|
||||
"""
|
||||
|
||||
await create_trap("schedule_tasks", tasks)
|
||||
|
|
|
@ -60,7 +60,7 @@ async def handler(sock: AsyncSocket, client_address: tuple):
|
|||
logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}")
|
||||
if client_sock != sock:
|
||||
await client_sock.send_all(data)
|
||||
logging.info(f"Echoed back {data!r} to {i} clients")
|
||||
logging.info(f"Sent {data!r} to {i} clients")
|
||||
logging.info(f"Connection from {address} closed")
|
||||
|
||||
|
||||
|
|
|
@ -8,9 +8,9 @@ import giambio
|
|||
async def child(ev: giambio.Event, pause: int):
|
||||
print("[child] Child is alive! Going to wait until notified")
|
||||
start_total = giambio.clock()
|
||||
await ev.wait()
|
||||
data = await ev.wait()
|
||||
end_pause = giambio.clock() - start_total
|
||||
print(f"[child] Parent set the event, exiting in {pause} seconds")
|
||||
print(f"[child] Parent set the event with value {data}, exiting in {pause} seconds")
|
||||
start_sleep = giambio.clock()
|
||||
await giambio.sleep(pause)
|
||||
end_sleep = giambio.clock() - start_sleep
|
||||
|
@ -26,7 +26,7 @@ async def parent(pause: int = 1):
|
|||
start = giambio.clock()
|
||||
print(f"[parent] Sleeping {pause} second(s) before setting the event")
|
||||
await giambio.sleep(pause)
|
||||
await event.trigger()
|
||||
await event.trigger(True)
|
||||
print("[parent] Event set, awaiting child completion")
|
||||
end = giambio.clock() - start
|
||||
print(f"[parent] Child exited in {end:.2f} seconds")
|
||||
|
|
|
@ -17,13 +17,14 @@ async def consumer(q: giambio.Queue):
|
|||
print("Consumer done")
|
||||
break
|
||||
print(f"Consumed {item}")
|
||||
await giambio.sleep(3)
|
||||
await giambio.sleep(1)
|
||||
|
||||
|
||||
|
||||
async def main(q: giambio.Queue, n: int):
|
||||
async with giambio.create_pool() as pool:
|
||||
await pool.spawn(producer, q, n)
|
||||
await pool.spawn(consumer, q)
|
||||
await pool.spawn(producer, q, n)
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue