mirror of https://github.com/nocturn9x/giambio.git
Merge remote-tracking branch 'origin/master'
# Conflicts: # tests/timeout3.py
This commit is contained in:
commit
ec9c4cf1c9
|
@ -55,7 +55,7 @@ class TaskManager:
|
||||||
self._proper_init = False
|
self._proper_init = False
|
||||||
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
|
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
|
||||||
self.raise_on_timeout: bool = raise_on_timeout
|
self.raise_on_timeout: bool = raise_on_timeout
|
||||||
self.entry_point: Optional[Task] = None
|
self.entry_point: Optional[giambio.Task] = None
|
||||||
|
|
||||||
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
|
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Import libraries and internal resources
|
# Import libraries and internal resources
|
||||||
|
from lib2to3.pytree import Base
|
||||||
import types
|
import types
|
||||||
from giambio.task import Task
|
from giambio.task import Task
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
@ -298,7 +299,10 @@ class AsyncScheduler:
|
||||||
# We need to make sure we don't try to execute
|
# We need to make sure we don't try to execute
|
||||||
# exited tasks that are on the running queue
|
# exited tasks that are on the running queue
|
||||||
return
|
return
|
||||||
if not self.current_pool:
|
if self.current_pool:
|
||||||
|
if self.current_task.pool and self.current_task.pool is not self.current_pool:
|
||||||
|
self.current_task.pool.enclosed_pool = self.current_pool
|
||||||
|
else:
|
||||||
self.current_pool = self.current_task.pool
|
self.current_pool = self.current_task.pool
|
||||||
pool = self.current_pool
|
pool = self.current_pool
|
||||||
while pool:
|
while pool:
|
||||||
|
@ -462,7 +466,8 @@ class AsyncScheduler:
|
||||||
|
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
self.paused.discard(task)
|
self.paused.discard(task)
|
||||||
self.suspended.remove(task)
|
if task in self.suspended:
|
||||||
|
self.suspended.remove(task)
|
||||||
self.run_ready.extend(tasks)
|
self.run_ready.extend(tasks)
|
||||||
self.reschedule_running()
|
self.reschedule_running()
|
||||||
|
|
||||||
|
@ -493,7 +498,7 @@ class AsyncScheduler:
|
||||||
:return: The closest deadline according to our clock
|
:return: The closest deadline according to our clock
|
||||||
:rtype: float
|
:rtype: float
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.deadlines:
|
if not self.deadlines:
|
||||||
# If there are no deadlines just wait until the first task wakeup
|
# If there are no deadlines just wait until the first task wakeup
|
||||||
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
|
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
|
||||||
|
@ -616,8 +621,9 @@ class AsyncScheduler:
|
||||||
If ensure_done equals False, the loop will cancel ALL
|
If ensure_done equals False, the loop will cancel ALL
|
||||||
running and scheduled tasks and then tear itself down.
|
running and scheduled tasks and then tear itself down.
|
||||||
If ensure_done equals True, which is the default behavior,
|
If ensure_done equals True, which is the default behavior,
|
||||||
this method will raise a GiambioError if the loop hasn't
|
this method will raise a GiambioError exception if the loop
|
||||||
finished running.
|
hasn't finished running. The state of the event loop is reset
|
||||||
|
so it can be reused with another run() call
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if ensure_done:
|
if ensure_done:
|
||||||
|
@ -644,18 +650,11 @@ class AsyncScheduler:
|
||||||
|
|
||||||
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
|
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
|
||||||
return
|
return
|
||||||
for t in task.joiners:
|
self.run_ready.extend(task.joiners)
|
||||||
if t not in self.run_ready:
|
|
||||||
# Since a task can be the parent
|
|
||||||
# of multiple children, we need to
|
|
||||||
# make sure we reschedule it only
|
|
||||||
# once, otherwise a RuntimeError will
|
|
||||||
# occur
|
|
||||||
self.run_ready.append(t)
|
|
||||||
|
|
||||||
def join(self, task: Task):
|
def join(self, task: Task):
|
||||||
"""
|
"""
|
||||||
Joins a task to its callers (implicitly, the parent
|
Joins a task to its callers (implicitly the parent
|
||||||
task, but also every other task who called await
|
task, but also every other task who called await
|
||||||
task.join() on the task object)
|
task.join() on the task object)
|
||||||
"""
|
"""
|
||||||
|
@ -668,8 +667,9 @@ class AsyncScheduler:
|
||||||
self.io_release_task(task)
|
self.io_release_task(task)
|
||||||
if task in self.suspended:
|
if task in self.suspended:
|
||||||
self.suspended.remove(task)
|
self.suspended.remove(task)
|
||||||
# If the pool has finished executing or we're at the first parent
|
# If the pool (including any enclosing pools) has finished executing
|
||||||
# task that kicked the loop, we can safely reschedule the parent(s)
|
# or we're at the first task that kicked the loop, we can safely
|
||||||
|
# reschedule the parent(s)
|
||||||
if task.pool is None:
|
if task.pool is None:
|
||||||
return
|
return
|
||||||
if task.pool.done():
|
if task.pool.done():
|
||||||
|
@ -680,7 +680,7 @@ class AsyncScheduler:
|
||||||
task.status = "crashed"
|
task.status = "crashed"
|
||||||
if task.exc.__traceback__:
|
if task.exc.__traceback__:
|
||||||
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
|
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
|
||||||
# frames from the exception call stack, but for now removing at least the first one
|
# frames from the exception call stack, but for now removing at least the first few
|
||||||
# seems a sensible approach (it's us catching it so we don't care about that)
|
# seems a sensible approach (it's us catching it so we don't care about that)
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
if task.exc.__traceback__.tb_next:
|
if task.exc.__traceback__.tb_next:
|
||||||
|
|
|
@ -287,7 +287,7 @@ class DeadlinesQueue:
|
||||||
:param pool: The pool object to store
|
:param pool: The pool object to store
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if pool and pool not in self.pools and not pool.done() and not pool.timed_out and pool.timeout:
|
if pool and pool not in self.pools and not pool.timed_out and pool.timeout:
|
||||||
self.pools.add(pool)
|
self.pools.add(pool)
|
||||||
heappush(self.container, (pool.timeout, self.sequence, pool))
|
heappush(self.container, (pool.timeout, self.sequence, pool))
|
||||||
self.sequence += 1
|
self.sequence += 1
|
||||||
|
|
|
@ -95,19 +95,16 @@ def create_pool():
|
||||||
return TaskManager()
|
return TaskManager()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def with_timeout(timeout: int or float):
|
def with_timeout(timeout: int or float):
|
||||||
"""
|
"""
|
||||||
Creates an async pool with an associated timeout
|
Creates an async pool with an associated timeout
|
||||||
"""
|
"""
|
||||||
|
|
||||||
assert timeout > 0, "The timeout must be greater than 0"
|
assert timeout > 0, "The timeout must be greater than 0"
|
||||||
|
mgr = TaskManager(timeout)
|
||||||
loop = get_event_loop()
|
loop = get_event_loop()
|
||||||
mgr = TaskManager(timeout)
|
if loop.current_task is loop.entry_point:
|
||||||
if loop.current_task is not loop.entry_point:
|
loop.current_pool = mgr
|
||||||
mgr.tasks.append(loop.current_task)
|
|
||||||
if loop.current_pool and loop.current_pool is not mgr:
|
|
||||||
loop.current_pool.enclosed_pool = mgr
|
|
||||||
return mgr
|
return mgr
|
||||||
|
|
||||||
|
|
||||||
|
@ -119,11 +116,8 @@ def skip_after(timeout: int or float):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
assert timeout > 0, "The timeout must be greater than 0"
|
assert timeout > 0, "The timeout must be greater than 0"
|
||||||
|
mgr = TaskManager(timeout, False)
|
||||||
loop = get_event_loop()
|
loop = get_event_loop()
|
||||||
mgr = TaskManager(timeout, False)
|
if loop.current_task is loop.entry_point:
|
||||||
if loop.current_task is not loop.entry_point:
|
loop.current_pool = mgr
|
||||||
mgr.tasks.append(loop.current_task)
|
|
||||||
if loop.current_pool and loop.current_pool is not mgr:
|
|
||||||
loop.current_pool.enclosed_pool = mgr
|
|
||||||
return mgr
|
return mgr
|
||||||
|
|
||||||
|
|
|
@ -11,10 +11,11 @@ async def child(name: int):
|
||||||
async def main():
|
async def main():
|
||||||
start = giambio.clock()
|
start = giambio.clock()
|
||||||
async with giambio.create_pool() as pool:
|
async with giambio.create_pool() as pool:
|
||||||
await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
|
# await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
|
||||||
task = await pool.spawn(child, 2)
|
task = await pool.spawn(child, 2)
|
||||||
await task.cancel()
|
|
||||||
print("[main] Children spawned, awaiting completion")
|
print("[main] Children spawned, awaiting completion")
|
||||||
|
await task.cancel()
|
||||||
|
print("[main] Second child cancelled")
|
||||||
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
from debugger import Debugger
|
||||||
|
import giambio
|
||||||
|
|
||||||
|
|
||||||
|
async def child(start: int, limit: int, q: giambio.Queue, l: giambio.Lock):
|
||||||
|
async with l: # If you comment this line, the resulting queue will
|
||||||
|
# be all mixed up!
|
||||||
|
# Locked! If any other task
|
||||||
|
# tries to acquire the lock,
|
||||||
|
# they'll wait for us to finish
|
||||||
|
while start <= limit:
|
||||||
|
print(f"[locked ({limit})] {start}")
|
||||||
|
await q.put(start)
|
||||||
|
start += 1
|
||||||
|
await giambio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
async def other(stop: int):
|
||||||
|
# Other tasks are unaffected
|
||||||
|
# by the lock if they don't
|
||||||
|
# acquire it
|
||||||
|
n = stop
|
||||||
|
while n:
|
||||||
|
print(f"[unlocked ({stop})] {n}")
|
||||||
|
n -= 1
|
||||||
|
await giambio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
queue = giambio.Queue()
|
||||||
|
lock = giambio.Lock()
|
||||||
|
async with giambio.create_pool() as pool:
|
||||||
|
await pool.spawn(child, 1, 5, queue, lock)
|
||||||
|
await pool.spawn(child, 6, 10, queue, lock)
|
||||||
|
await pool.spawn(other, 10)
|
||||||
|
await pool.spawn(other, 5)
|
||||||
|
print(f"Result: {queue}") # Queue is ordered!
|
||||||
|
print("[main] Done")
|
||||||
|
|
||||||
|
|
||||||
|
giambio.run(main, debugger=())
|
|
@ -36,7 +36,7 @@ async def main():
|
||||||
print("[main] First 2 children spawned, awaiting completion")
|
print("[main] First 2 children spawned, awaiting completion")
|
||||||
async with giambio.create_pool() as new_pool:
|
async with giambio.create_pool() as new_pool:
|
||||||
# This pool will be cancelled by the exception
|
# This pool will be cancelled by the exception
|
||||||
# in the other pool
|
# in the outer pool
|
||||||
await new_pool.spawn(child2)
|
await new_pool.spawn(child2)
|
||||||
await new_pool.spawn(child3)
|
await new_pool.spawn(child3)
|
||||||
print("[main] Third and fourth children spawned")
|
print("[main] Third and fourth children spawned")
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
## Example for nested task pools
|
||||||
import giambio
|
import giambio
|
||||||
from debugger import Debugger
|
from debugger import Debugger
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
## Two-way proxy example stolen from njsmith's talk about trio
|
||||||
|
|
||||||
from debugger import Debugger
|
from debugger import Debugger
|
||||||
import giambio
|
import giambio
|
||||||
import socket
|
import socket
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
## Producer-consumer code using giambio's async queue
|
||||||
import giambio
|
import giambio
|
||||||
from debugger import Debugger
|
from debugger import Debugger
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
## SImple task IPC using giambio's MemoryChannel class
|
## Simple task IPC using giambio's MemoryChannel class
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
import giambio
|
import giambio
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
## Simple task IPC using giambio's NetworkChannel class
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
import giambio
|
import giambio
|
||||||
|
|
|
@ -6,11 +6,24 @@ async def child(name: int):
|
||||||
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
|
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
|
||||||
await giambio.sleep(name)
|
await giambio.sleep(name)
|
||||||
print(f"[child {name}] Had a nice nap!")
|
print(f"[child {name}] Had a nice nap!")
|
||||||
|
<<<<<<< HEAD
|
||||||
return name
|
return name
|
||||||
|
=======
|
||||||
|
|
||||||
|
|
||||||
|
async def worker(coro, *args):
|
||||||
|
try:
|
||||||
|
async with giambio.with_timeout(10):
|
||||||
|
await coro(*args)
|
||||||
|
except giambio.exceptions.TooSlowError:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
>>>>>>> origin/master
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
start = giambio.clock()
|
start = giambio.clock()
|
||||||
|
<<<<<<< HEAD
|
||||||
try:
|
try:
|
||||||
async with giambio.with_timeout(5) as pool:
|
async with giambio.with_timeout(5) as pool:
|
||||||
task = await pool.spawn(child, 2)
|
task = await pool.spawn(child, 2)
|
||||||
|
@ -24,3 +37,16 @@ async def main():
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
giambio.run(main, debugger=())
|
giambio.run(main, debugger=())
|
||||||
|
=======
|
||||||
|
async with giambio.skip_after(10) as pool:
|
||||||
|
t = await pool.spawn(worker, child, 7)
|
||||||
|
await giambio.sleep(2)
|
||||||
|
t2 = await pool.spawn(worker, child, 15)
|
||||||
|
if any([await t.join(), await t2.join()]):
|
||||||
|
print("[main] One or more children have timed out!")
|
||||||
|
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
giambio.run(main, debugger=())
|
||||||
|
>>>>>>> origin/master
|
||||||
|
|
Loading…
Reference in New Issue