Improved the API

This commit is contained in:
nocturn9x 2020-03-25 14:09:39 +00:00
parent 0e29cb7e8a
commit cdbf6d8ce1
5 changed files with 36 additions and 14 deletions

View File

@ -45,12 +45,17 @@ async def main():
print("Spawning countdown immediately, scheduling count for 4 secs from now") print("Spawning countdown immediately, scheduling count for 4 secs from now")
task = loop.spawn(countdown(8)) task = loop.spawn(countdown(8))
task1 = loop.schedule(count(6, 2), 4) # Schedules the task, it will be ran 4 seconds from now task1 = loop.schedule(count(6, 2), 4) # Schedules the task, it will be ran 4 seconds from now
await giambio.sleep(0) # TODO: Fix this to avoid the need to use a checkpoint before cancelling await giambio.sleep(2) # TODO: Fix this to avoid the need to use a checkpoint before cancelling
await task.cancel() # await task.cancel()
# result = await task.join() # Would raise TaskError! result = await task.join() # Would raise TaskCancelled if the task was cancelled
result = 'Task cancelled' if task.cancelled else task.result.val result = 'Task cancelled' if task.cancelled else task.result.val
result1 = await task1.join() result1 = await task1.join()
print(f"countdown returned: {result}\ncount returned: {result1}") print(f"countdown returned: {result}\ncount returned: {result1}")
print("All done") print("All done")
print("PT. 2 Context Manager")
async with giambio.TaskManager(loop) as manager:
task2 = await manager.spawn(countdown(8))
task3 = await manager.schedule(count(16, 2), 4)
print(manager.values)
loop.start(main) loop.start(main)

View File

@ -25,6 +25,7 @@ class Task:
self.loop = loop # The EventLoop object that spawned the task self.loop = loop # The EventLoop object that spawned the task
self.cancelled = False self.cancelled = False
self.execution = "INIT" self.execution = "INIT"
self.steps = 0 # How many steps did the task do before ending, incremented while executing
def run(self): def run(self):
self.status = True self.status = True
@ -40,8 +41,8 @@ class Task:
async def cancel(self): async def cancel(self):
return await cancel(self) return await cancel(self)
async def join(self): async def join(self, silent=False):
return await join(self) return await join(self, silent)
def get_result(self): def get_result(self):
if self.result: if self.result:

View File

@ -50,6 +50,7 @@ class EventLoop:
try: try:
method, *args = self.running.run() method, *args = self.running.run()
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
self.running.steps += 1
except StopIteration as e: except StopIteration as e:
self.running.execution = "FINISH" self.running.execution = "FINISH"
self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value

View File

@ -34,17 +34,21 @@ def want_write(sock: socket.socket):
@types.coroutine @types.coroutine
def join(task): def join(task, silent=False):
"""'Tells' the scheduler that the desired task MUST be awaited for completion""" """'Tells' the scheduler that the desired task MUST be awaited for completion
If silent is True, any exception in the child task will be discarded"""
if task.cancelled: if task.cancelled:
raise TaskCancelled("Cannot join cancelled task!") raise TaskCancelled("cannot join cancelled task!")
if task.execution == "FINISH":
raise TaskFinished("Cannot join already terminated task!")
task.joined = True task.joined = True
yield "want_join", task yield "want_join", task
return task.get_result() # This raises an exception if the child task errored if silent:
try:
return task.get_result() # Exception silenced
except:
return None
else:
return task.get_result() # Will raise
@types.coroutine @types.coroutine
def cancel(task): def cancel(task):
@ -52,3 +56,13 @@ def cancel(task):
yield "want_cancel", task yield "want_cancel", task
@types.coroutine
def join_unfinished(task):
"""Same as join(), but it will raise a TaskFinished exception if the task already ended"""
if task.execution == "FINISH":
raise TaskFinished("task has already ended!")
yield "want_join", task
task.joined = True
return task.get_result()

View File

@ -6,17 +6,18 @@ class TaskManager(Task):
"""Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block""" """Class to be used inside context managers to spawn multiple tasks and be sure that they will all joined before the code exits the with block"""
def __init__(self, loop): def __init__(self, loop, silent=False):
self.tasks = deque() # All tasks spawned self.tasks = deque() # All tasks spawned
self.values = {} # Results OR exceptions of each task self.values = {} # Results OR exceptions of each task
self.loop = loop self.loop = loop
self.silent = silent
async def __aenter__(self): async def __aenter__(self):
return self return self
async def __aexit__(self, *args): async def __aexit__(self, *args):
for task in self.tasks: for task in self.tasks:
self.values[task.coroutine.__name__] = await task.join() self.values[task.coroutine.__name__] = await task.join(self.silent)
async def spawn(self, coro): async def spawn(self, coro):
task = self.loop.spawn(coro) task = self.loop.spawn(coro)