Many fixes, many TODOs added

This commit is contained in:
nocturn9x 2020-03-25 22:36:17 +00:00
parent 9f4346b430
commit f9eb329ec9
4 changed files with 35 additions and 24 deletions

View File

@ -4,17 +4,17 @@ loop = giambio.EventLoop()
""" """
What works and what does not (23rd March 2020 23:24 PM) What works and what does not (25th March 2020 20:35)
- Run tasks concurrently: V - Run tasks concurrently: V
- Join mechanism: V - Join mechanism: V
- Sleep mechanism: V - Sleep mechanism: V
- Cancellation mechanism: X Note: Figure out how to rescheule parent task - Cancellation mechanism: V
- Exception propagation: V - Exception propagation: V
- Concurrent I/O: V - Concurrent I/O: V
- Return values of coroutines: V - Return values of coroutines: V
- Scheduling tasks for future execution: V - Scheduling tasks for future execution: V
- Task Spawner (context manager): X Note: Not Implemented - Task Spawner (context manager): V
""" """
@ -28,7 +28,7 @@ async def countdown(n):
return "Count DOWN over" return "Count DOWN over"
except giambio.CancelledError: except giambio.CancelledError:
print("countdown cancelled!") print("countdown cancelled!")
raise Exception("Oh no!") #TODO Propagate this
async def count(stop, step=1): async def count(stop, step=1):
try: try:
@ -44,13 +44,16 @@ async def count(stop, step=1):
async def main(): async def main():
print("Spawning countdown immediately, scheduling count for 4 secs from now") try:
async with giambio.TaskManager(loop) as manager: print("Spawning countdown immediately, scheduling count for 4 secs from now")
task = await manager.spawn(countdown(4)) async with giambio.TaskManager(loop) as manager:
await manager.schedule(count(8, 2), 4) task = manager.spawn(countdown(4))
await task.cancel() manager.schedule(count(8, 2), 4)
for task, ret in manager.values.items(): await task.cancel()
print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}") for task, ret in manager.values.items():
print(f"Function '{task.coroutine.__name__}' at {hex(id(task.coroutine))} returned an object of type '{type(ret).__name__}': {repr(ret)}")
except: # TODO: Fix this, see above
pass
loop.start(main) loop.start(main)

View File

@ -10,6 +10,7 @@ from .socket import AsyncSocket, WantRead, WantWrite
from .abstractions import Task, Result from .abstractions import Task, Result
from socket import SOL_SOCKET, SO_ERROR from socket import SOL_SOCKET, SO_ERROR
from .traps import _join, _sleep, _want_read, _want_write, _cancel from .traps import _join, _sleep, _want_read, _want_write, _cancel
from .util import TaskManager
class EventLoop: class EventLoop:
@ -59,10 +60,8 @@ class EventLoop:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except Exception as has_raised: except Exception as has_raised:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
if self.running.joined: # Let the join function handle the hassle of propagating the error self.running.result = Result(None, has_raised) # Save the exception
self.running.result = Result(None, has_raised) # Save the exception raise
else: # Let the exception propagate (I'm looking at you asyncIO ;))
raise
except KeyboardInterrupt: except KeyboardInterrupt:
self.running.throw(KeyboardInterrupt) self.running.throw(KeyboardInterrupt)
@ -70,7 +69,7 @@ class EventLoop:
def start(self, coroutine: types.coroutine, *args, **kwargs): def start(self, coroutine: types.coroutine, *args, **kwargs):
"""Starts the event loop""" """Starts the event loop"""
self.to_run.append(coroutine(*args, **kwargs)) TaskManager(self).spawn(coroutine(*args, **kwargs))
self.loop() self.loop()
def want_read(self, sock: socket.socket): def want_read(self, sock: socket.socket):
@ -124,6 +123,7 @@ class EventLoop:
coroutine returns or, if an exception gets raised, the exception will get propagated inside the coroutine returns or, if an exception gets raised, the exception will get propagated inside the
parent task""" parent task"""
if coro not in self.joined: if coro not in self.joined:
self.joined[coro].append(self.running) self.joined[coro].append(self.running)
else: else:
@ -137,11 +137,12 @@ class EventLoop:
self.to_run.append(self.running) # Reschedule the task that called sleep self.to_run.append(self.running) # Reschedule the task that called sleep
def want_cancel(self, task): def want_cancel(self, task):
task.cancelled = True
self.to_run.extend(self.joined.pop(self.running, ())) self.to_run.extend(self.joined.pop(self.running, ()))
self.to_run.append(self.running) # Reschedules the parent task self.to_run.append(self.running) # Reschedules the parent task
# task.cancelled = True
task.throw(CancelledError()) task.throw(CancelledError())
async def connect_sock(self, sock: socket.socket, addr: tuple): async def connect_sock(self, sock: socket.socket, addr: tuple):
try: # "Borrowed" from curio try: # "Borrowed" from curio
result = sock.connect(addr) result = sock.connect(addr)

View File

@ -72,5 +72,6 @@ def _cancel(task):
be cancelled at any time be cancelled at any time
""" """
task.cancelled = True
yield "want_cancel", task yield "want_cancel", task

View File

@ -18,18 +18,23 @@ class TaskManager:
async def __aenter__(self): async def __aenter__(self):
return self return self
async def __aexit__(self, *args): async def __aexit__(self, type, value, traceback):
for task in self.tasks: if type:
if task.cancelled: # TODO: Handle exceptions here
self.values[task] = CancelledError() ...
else: else:
self.values[task] = await task.join(self.silent) for task in self.tasks:
if not task.cancelled:
self.values[task] = await task.join()
else:
self.values[task] = CancelledError()
def spawn(self, coroutine: types.coroutine): def spawn(self, coroutine: types.coroutine):
"""Schedules a task for execution, appending it to the call stack""" """Schedules a task for execution, appending it to the call stack"""
task = Task(coroutine, self) task = Task(coroutine, self)
self.loop.to_run.append(task) self.loop.to_run.append(task)
self.tasks.append(task)
return task return task
def schedule(self, coroutine: types.coroutine, when: int): def schedule(self, coroutine: types.coroutine, when: int):
@ -37,5 +42,6 @@ class TaskManager:
self.loop.sequence += 1 self.loop.sequence += 1
task = Task(coroutine, self) task = Task(coroutine, self)
self.tasks.append(task)
heappush(self.loop.paused, (self.loop.clock() + when, self.loop.sequence, task)) heappush(self.loop.paused, (self.loop.clock() + when, self.loop.sequence, task))
return task return task