Many fixes, still exceptions do not propagate properly, but tasks are cancelled when a task dies

This commit is contained in:
nocturn9x 2020-04-04 16:43:07 +00:00
parent f9eb329ec9
commit d2d43235c9
5 changed files with 30 additions and 33 deletions

View File

@ -10,7 +10,7 @@ What works and what does not (25th March 2020 20:35)
- Join mechanism: V - Join mechanism: V
- Sleep mechanism: V - Sleep mechanism: V
- Cancellation mechanism: V - Cancellation mechanism: V
- Exception propagation: V - Exception propagation: X
- Concurrent I/O: V - Concurrent I/O: V
- Return values of coroutines: V - Return values of coroutines: V
- Scheduling tasks for future execution: V - Scheduling tasks for future execution: V
@ -23,6 +23,8 @@ async def countdown(n):
while n > 0: while n > 0:
print(f"Down {n}") print(f"Down {n}")
n -= 1 n -= 1
if n <= 2: # Test an exception that triggers only sometimes
raise ValueError
await giambio.sleep(1) await giambio.sleep(1)
print("Countdown over") print("Countdown over")
return "Count DOWN over" return "Count DOWN over"
@ -44,16 +46,13 @@ async def count(stop, step=1):
async def main(): async def main():
try:
print("Spawning countdown immediately, scheduling count for 4 secs from now") print("Spawning countdown immediately, scheduling count for 4 secs from now")
async with giambio.TaskManager(loop) as manager: async with giambio.TaskManager(loop) as manager:
task = manager.spawn(countdown(4)) task = manager.spawn(countdown(8))
manager.schedule(count(8, 2), 4) manager.schedule(count(8, 2), 4)
await task.cancel()
for task, ret in manager.values.items(): for task, ret in manager.values.items():
print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}") print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}")
except: # TODO: Fix this, see above
pass
loop.start(main) loop.start(main)

BIN
giambio/.exceptions.py.swp Normal file

Binary file not shown.

View File

@ -10,7 +10,7 @@ class Result:
self.exc = exc self.exc = exc
def __repr__(self): def __repr__(self):
return f"giambio.core.Result({self.val}, {self.exc})" return f"giambio.core.Result({self.val}, {repr(self.exc)})"
class Task: class Task:
@ -19,7 +19,6 @@ class Task:
def __init__(self, coroutine: types.coroutine, loop): def __init__(self, coroutine: types.coroutine, loop):
self.coroutine = coroutine self.coroutine = coroutine
self.status = False # Not ran yet
self.joined = False # True if the task is joined self.joined = False # True if the task is joined
self.result = None # Updated when the coroutine execution ends self.result = None # Updated when the coroutine execution ends
self.loop = loop # The EventLoop object that spawned the task self.loop = loop # The EventLoop object that spawned the task
@ -30,9 +29,11 @@ class Task:
def run(self): def run(self):
"""Simple abstraction layer over the coroutines ``send`` method""" """Simple abstraction layer over the coroutines ``send`` method"""
self.status = True self.steps += 1
self.execution = "RUNNING"
return self.coroutine.send(None) return self.coroutine.send(None)
def __repr__(self): def __repr__(self):
return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})" return f"giambio.core.Task({self.coroutine}, {self.status}, {self.joined}, {self.result})"
@ -53,12 +54,4 @@ class Task:
return await _join(self, silent) return await _join(self, silent)
def get_result(self, silenced=False): def get_result(self, silenced=False):
if self.result: return self.result
if not silenced:
if self.result.exc:
raise self.result.exc
else:
return self.result.val
return self.result.val if self.result.val else self.result.exc

View File

@ -1,3 +1,4 @@
import types import types
from collections import deque, defaultdict from collections import deque, defaultdict
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -58,10 +59,13 @@ class EventLoop:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except RuntimeError: except RuntimeError:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except Exception as has_raised: except CancelledError:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.running.execution = "CANCELLED"
self.running.result = Result(None, has_raised) # Save the exception self.to_run.extend(self.joined.pop(self.running, ()))
raise except Exception as err:
self.running.execution = "ERRORED"
self.running.result = Result(None, err)
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except KeyboardInterrupt: except KeyboardInterrupt:
self.running.throw(KeyboardInterrupt) self.running.throw(KeyboardInterrupt)
@ -139,7 +143,6 @@ class EventLoop:
def want_cancel(self, task): def want_cancel(self, task):
self.to_run.extend(self.joined.pop(self.running, ())) self.to_run.extend(self.joined.pop(self.running, ()))
self.to_run.append(self.running) # Reschedules the parent task self.to_run.append(self.running) # Reschedules the parent task
# task.cancelled = True
task.throw(CancelledError()) task.throw(CancelledError())
@ -151,4 +154,4 @@ class EventLoop:
await _want_write(sock) await _want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR) err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0: if err != 0:
raise OSError(err, f'Connect call failed {addr}') raise OSError(err, f'Connect call failed: {addr}')

View File

@ -15,19 +15,21 @@ class TaskManager:
self.loop = loop # The event loop that spawned the TaskManager self.loop = loop # The event loop that spawned the TaskManager
self.silent = silent # Make exceptions silent? (not recommended) self.silent = silent # Make exceptions silent? (not recommended)
async def _cancel_and_raise(self, exc):
for task in self.tasks:
await task.cancel()
async def __aenter__(self): async def __aenter__(self):
return self return self
async def __aexit__(self, type, value, traceback): async def __aexit__(self, type, value, traceback):
if type: while self.tasks:
# TODO: Handle exceptions here task = self.tasks.popleft()
... self.values[task] = await task.join()
else: if task.result.exc:
for task in self.tasks: if task.result.exc != CancelledError:
if not task.cancelled: await self._cancel_and_raise(task.result.exc)
self.values[task] = await task.join()
else:
self.values[task] = CancelledError()
def spawn(self, coroutine: types.coroutine): def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack""" """Schedules a task for execution, appending it to the call stack"""