Fixed join mechanism

This commit is contained in:
nocturn9x 2020-03-23 17:52:17 +00:00
parent e8bef73f36
commit e55e3daa6c
4 changed files with 46 additions and 31 deletions

View File

@ -40,11 +40,12 @@ async def count(stop, step=1):
async def main(): async def main():
print("Spawning countdown immediately, scheduling count for 2 secs from now") print("Spawning countdown immediately, scheduling count for 2 secs from now")
task = loop.spawn(countdown(8)) task = loop.spawn(countdown(8))
task1 = loop.schedule(count(8, 2), 2) task1 = loop.spawn(count(8, 2))
await giambio.sleep(2) # Wait before cancelling await giambio.sleep(2)
# await task.cancel() # Cancel the task await task.cancel()
result = await task1.join() # Joining multiple tasks still causes problems result = await task1.join() # Joining multiple tasks still causes problems
# result1 = await task.join() result1 = await task.join()
print(result, result1)
print("All done") print("All done")
loop.start(main) loop.start(main)

BIN
giambio/.core.cover.swp Normal file

Binary file not shown.

View File

@ -49,16 +49,12 @@ class EventLoop:
method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;) method, *args = self.running.run() # Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args) getattr(self, method)(*args)
except StopIteration as e: # TODO: Fix this return mechanism, it looks like the return value of the child task gets "replaced" by None at some point except StopIteration as e: # TODO: Fix this return mechanism, it looks like the return value of the child task gets "replaced" by None at some point
self.running.ret_value = e.args[0] if e.args else None # Saves the return value self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except RuntimeError:
self.running.cancelled = True
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
print(self.to_run)
except Exception as has_raised: except Exception as has_raised:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
if self.running.joined: # Let the join function handle the hassle of propagating the error if self.running.joined: # Let the join function handle the hassle of propagating the error
self.running.exception = has_raised # Save the exception self.running.result = Result(None, has_raised) # Save the exception
else: # Let the exception propagate (I'm looking at you asyncIO ;)) else: # Let the exception propagate (I'm looking at you asyncIO ;))
raise raise
except KeyboardInterrupt: except KeyboardInterrupt:
@ -67,11 +63,11 @@ class EventLoop:
def spawn(self, coroutine: types.coroutine): def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack""" """Schedules a task for execution, appending it to the call stack"""
task = Task(coroutine) task = Task(coroutine, self)
self.to_run.append(task) self.to_run.append(task)
return task return task
def schedule(self, coroutine: types.coroutine, when: int): def schedule(self, coroutine: types.coroutine, when: int): # TODO: Fix this
"""Schedules a task for execution after n seconds""" """Schedules a task for execution after n seconds"""
self.sequence += 1 self.sequence += 1
@ -131,31 +127,50 @@ class EventLoop:
heappush(self.paused, (self.clock() + seconds, self.sequence, self.running)) heappush(self.paused, (self.clock() + seconds, self.sequence, self.running))
def want_cancel(self, task): def want_cancel(self, task):
task.coroutine.throw(CancelledError) self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
task.cancelled = True
task.throw(CancelledError())
async def connect_sock(self, sock: socket.socket, addr: tuple): async def connect_sock(self, sock: socket.socket, addr: tuple):
await want_write(sock) await want_write(sock)
return sock.connect(addr) return sock.connect(addr)
class Result:
"""A wrapper for results of coroutines"""
def __init__(self, val=None, exc: Exception = None):
self.val = val
self.exc = exc
def __repr__(self):
return f"giambio.core.Result({self.val}, {self.exc})"
class Task: class Task:
"""A simple wrapper around a coroutine object""" """A simple wrapper around a coroutine object"""
def __init__(self, coroutine: types.coroutine): def __init__(self, coroutine: types.coroutine, loop: EventLoop):
self.coroutine = coroutine self.coroutine = coroutine
self.status = False # Not ran yet self.status = False # Not ran yet
self.joined = False self.joined = False
self.ret_val = None # Return value is saved here self.result = None # Updated when the coroutine execution ends
self.exception = None # If errored, the exception is saved here self.loop = loop # The EventLoop object that spawned the task
self.cancelled = False # When cancelled, this is True
def run(self): def run(self):
self.status = True self.status = True
return self.coroutine.send(None) try:
return self.coroutine.send(None)
except RuntimeError:
print(self.loop.to_run)
def __repr__(self): def __repr__(self):
return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.ret_val}, {self.exception}, {self.cancelled})" return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})"
def throw(self, exception: Exception):
return self.coroutine.throw(exception)
async def cancel(self): async def cancel(self):
return await cancel(self) return await cancel(self)
@ -163,6 +178,13 @@ class Task:
async def join(self): async def join(self):
return await join(self) return await join(self)
def get_result(self):
if self.result:
if self.result.exc:
raise self.result.exc
else:
return self.result.val
@types.coroutine @types.coroutine
def sleep(seconds: int): def sleep(seconds: int):
@ -192,20 +214,9 @@ def join(task: Task):
task.joined = True task.joined = True
yield "want_join", task yield "want_join", task
if task.exception: return task.get_result()
print("Traceback (most recent call last):")
traceback.print_tb(task.exception.__traceback__)
exception_name = type(task.exception).__name__
if str(task.exception):
print(f"{exception_name}: {task.exception}")
else:
print(task.exception)
raise task.exception
return task.ret_val
@types.coroutine @types.coroutine
def cancel(task: Task): def cancel(task: Task):
yield "want_cancel", task yield "want_cancel", task
assert task.cancelled

View File

@ -9,3 +9,6 @@ class AlreadyJoinedError(GiambioError):
class CancelledError(GiambioError): class CancelledError(GiambioError):
"""Exception raised as a result of the giambio.core.cancel() method""" """Exception raised as a result of the giambio.core.cancel() method"""
def __repr__(self):
return "giambio.exceptions.CancelledError"