Locks stuff + fixes + bugs

This commit is contained in:
Nocturn9x 2022-05-14 11:19:55 +02:00
parent ad34be8754
commit 66d7c51268
13 changed files with 108 additions and 35 deletions

View File

@ -55,7 +55,7 @@ class TaskManager:
self._proper_init = False self._proper_init = False
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
self.raise_on_timeout: bool = raise_on_timeout self.raise_on_timeout: bool = raise_on_timeout
self.entry_point: Optional[Task] = None self.entry_point: Optional[giambio.Task] = None
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task": async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
""" """

View File

@ -17,6 +17,7 @@ limitations under the License.
""" """
# Import libraries and internal resources # Import libraries and internal resources
from lib2to3.pytree import Base
import types import types
from giambio.task import Task from giambio.task import Task
from collections import deque from collections import deque
@ -298,7 +299,10 @@ class AsyncScheduler:
# We need to make sure we don't try to execute # We need to make sure we don't try to execute
# exited tasks that are on the running queue # exited tasks that are on the running queue
return return
if not self.current_pool: if self.current_pool:
if self.current_task.pool and self.current_task.pool is not self.current_pool:
self.current_task.pool.enclosed_pool = self.current_pool
else:
self.current_pool = self.current_task.pool self.current_pool = self.current_task.pool
pool = self.current_pool pool = self.current_pool
while pool: while pool:
@ -462,7 +466,8 @@ class AsyncScheduler:
for task in tasks: for task in tasks:
self.paused.discard(task) self.paused.discard(task)
self.suspended.remove(task) if task in self.suspended:
self.suspended.remove(task)
self.run_ready.extend(tasks) self.run_ready.extend(tasks)
self.reschedule_running() self.reschedule_running()
@ -493,7 +498,7 @@ class AsyncScheduler:
:return: The closest deadline according to our clock :return: The closest deadline according to our clock
:rtype: float :rtype: float
""" """
if not self.deadlines: if not self.deadlines:
# If there are no deadlines just wait until the first task wakeup # If there are no deadlines just wait until the first task wakeup
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock()) timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
@ -616,8 +621,9 @@ class AsyncScheduler:
If ensure_done equals False, the loop will cancel ALL If ensure_done equals False, the loop will cancel ALL
running and scheduled tasks and then tear itself down. running and scheduled tasks and then tear itself down.
If ensure_done equals True, which is the default behavior, If ensure_done equals True, which is the default behavior,
this method will raise a GiambioError if the loop hasn't this method will raise a GiambioError exception if the loop
finished running. hasn't finished running. The state of the event loop is reset
so it can be reused with another run() call
""" """
if ensure_done: if ensure_done:
@ -644,18 +650,11 @@ class AsyncScheduler:
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done(): if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
return return
for t in task.joiners: self.run_ready.extend(task.joiners)
if t not in self.run_ready:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.run_ready.append(t)
def join(self, task: Task): def join(self, task: Task):
""" """
Joins a task to its callers (implicitly, the parent Joins a task to its callers (implicitly the parent
task, but also every other task who called await task, but also every other task who called await
task.join() on the task object) task.join() on the task object)
""" """
@ -668,8 +667,9 @@ class AsyncScheduler:
self.io_release_task(task) self.io_release_task(task)
if task in self.suspended: if task in self.suspended:
self.suspended.remove(task) self.suspended.remove(task)
# If the pool has finished executing or we're at the first parent # If the pool (including any enclosing pools) has finished executing
# task that kicked the loop, we can safely reschedule the parent(s) # or we're at the first task that kicked the loop, we can safely
# reschedule the parent(s)
if task.pool is None: if task.pool is None:
return return
if task.pool.done(): if task.pool.done():
@ -680,7 +680,7 @@ class AsyncScheduler:
task.status = "crashed" task.status = "crashed"
if task.exc.__traceback__: if task.exc.__traceback__:
# TODO: We might want to do a bit more complex traceback hacking to remove any extra # TODO: We might want to do a bit more complex traceback hacking to remove any extra
# frames from the exception call stack, but for now removing at least the first one # frames from the exception call stack, but for now removing at least the first few
# seems a sensible approach (it's us catching it so we don't care about that) # seems a sensible approach (it's us catching it so we don't care about that)
for _ in range(5): for _ in range(5):
if task.exc.__traceback__.tb_next: if task.exc.__traceback__.tb_next:

View File

@ -273,7 +273,7 @@ class DeadlinesQueue:
:param pool: The pool object to store :param pool: The pool object to store
""" """
if pool and pool not in self.pools and not pool.done() and not pool.timed_out and pool.timeout: if pool and pool not in self.pools and not pool.timed_out and pool.timeout:
self.pools.add(pool) self.pools.add(pool)
heappush(self.container, (pool.timeout, self.sequence, pool)) heappush(self.container, (pool.timeout, self.sequence, pool))
self.sequence += 1 self.sequence += 1

View File

@ -95,19 +95,16 @@ def create_pool():
return TaskManager() return TaskManager()
def with_timeout(timeout: int or float): def with_timeout(timeout: int or float):
""" """
Creates an async pool with an associated timeout Creates an async pool with an associated timeout
""" """
assert timeout > 0, "The timeout must be greater than 0" assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout)
loop = get_event_loop() loop = get_event_loop()
mgr = TaskManager(timeout) if loop.current_task is loop.entry_point:
if loop.current_task is not loop.entry_point: loop.current_pool = mgr
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
return mgr return mgr
@ -119,11 +116,8 @@ def skip_after(timeout: int or float):
""" """
assert timeout > 0, "The timeout must be greater than 0" assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout, False)
loop = get_event_loop() loop = get_event_loop()
mgr = TaskManager(timeout, False) if loop.current_task is loop.entry_point:
if loop.current_task is not loop.entry_point: loop.current_pool = mgr
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
return mgr return mgr

View File

@ -11,10 +11,11 @@ async def child(name: int):
async def main(): async def main():
start = giambio.clock() start = giambio.clock()
async with giambio.create_pool() as pool: async with giambio.create_pool() as pool:
await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! # await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
task = await pool.spawn(child, 2) task = await pool.spawn(child, 2)
await task.cancel()
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")
await task.cancel()
print("[main] Second child cancelled")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")

41
tests/lock.py Normal file
View File

@ -0,0 +1,41 @@
from debugger import Debugger
import giambio
async def child(start: int, limit: int, q: giambio.Queue, l: giambio.Lock):
async with l: # If you comment this line, the resulting queue will
# be all mixed up!
# Locked! If any other task
# tries to acquire the lock,
# they'll wait for us to finish
while start <= limit:
print(f"[locked ({limit})] {start}")
await q.put(start)
start += 1
await giambio.sleep(1)
async def other(stop: int):
# Other tasks are unaffected
# by the lock if they don't
# acquire it
n = stop
while n:
print(f"[unlocked ({stop})] {n}")
n -= 1
await giambio.sleep(1)
async def main():
queue = giambio.Queue()
lock = giambio.Lock()
async with giambio.create_pool() as pool:
await pool.spawn(child, 1, 5, queue, lock)
await pool.spawn(child, 6, 10, queue, lock)
await pool.spawn(other, 10)
await pool.spawn(other, 5)
print(f"Result: {queue}") # Queue is ordered!
print("[main] Done")
giambio.run(main, debugger=())

View File

@ -36,7 +36,7 @@ async def main():
print("[main] First 2 children spawned, awaiting completion") print("[main] First 2 children spawned, awaiting completion")
async with giambio.create_pool() as new_pool: async with giambio.create_pool() as new_pool:
# This pool will be cancelled by the exception # This pool will be cancelled by the exception
# in the other pool # in the outer pool
await new_pool.spawn(child2) await new_pool.spawn(child2)
await new_pool.spawn(child3) await new_pool.spawn(child3)
print("[main] Third and fourth children spawned") print("[main] Third and fourth children spawned")

View File

@ -1,3 +1,4 @@
## Example for nested task pools
import giambio import giambio
from debugger import Debugger from debugger import Debugger

View File

@ -1,3 +1,5 @@
## Two-way proxy example stolen from njsmith's talk about trio
from debugger import Debugger from debugger import Debugger
import giambio import giambio
import socket import socket

View File

@ -1,3 +1,4 @@
## Producer-consumer code using giambio's async queue
import giambio import giambio
from debugger import Debugger from debugger import Debugger

View File

@ -1,4 +1,4 @@
## SImple task IPC using giambio's MemoryChannel class ## Simple task IPC using giambio's MemoryChannel class
import random import random
import string import string
import giambio import giambio

View File

@ -1,3 +1,4 @@
## Simple task IPC using giambio's NetworkChannel class
import random import random
import string import string
import giambio import giambio

32
tests/timeout3.py Normal file
View File

@ -0,0 +1,32 @@
import giambio
from debugger import Debugger
async def child(name: int):
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
await giambio.sleep(name)
print(f"[child {name}] Had a nice nap!")
async def worker(coro, *args):
try:
async with giambio.with_timeout(10):
await coro(*args)
except giambio.exceptions.TooSlowError:
return True
return False
async def main():
start = giambio.clock()
async with giambio.skip_after(10) as pool:
t = await pool.spawn(worker, child, 7)
await giambio.sleep(2)
t2 = await pool.spawn(worker, child, 15)
if any([await t.join(), await t2.join()]):
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())