Partial fix to the cancel method

This commit is contained in:
nocturn9x 2020-03-25 11:27:29 +01:00
parent 14c21b56de
commit c5da08a140
6 changed files with 17 additions and 16 deletions

View File

@ -45,11 +45,11 @@ async def main():
print("Spawning countdown immediately, scheduling count for 4 secs from now") print("Spawning countdown immediately, scheduling count for 4 secs from now")
task = loop.spawn(countdown(8)) task = loop.spawn(countdown(8))
task1 = loop.schedule(count(8, 2), 4) # Schedules the task, it will be ran 4 seconds from now task1 = loop.schedule(count(8, 2), 4) # Schedules the task, it will be ran 4 seconds from now
await giambio.sleep(0) # Act as a checkpoint to switch tasks. Beware! Cancelling a task straight away will propagate the error in the parent await giambio.sleep(0) # TODO: Fix this to avoid the need to use a checkpoint before cancelling
# await task.cancel() # TODO: Fix this to reschedule the parent task properly await task.cancel()
result = await task.join() # result = await task.join() # Would raise TaskError!
result1 = await task1.join() result1 = await task1.join()
print(f"countdown returned: {result}\ncount returned: {result1}") print(f"countdown returned: {None}\ncount returned: {result1}")
print("All done") print("All done")
loop.start(main) loop.start(main)

View File

@ -1,5 +1,5 @@
import types import types
from .traps import join, sleep, want_read, want_write, cancel
class Result: class Result:
"""A wrapper for results of coroutines""" """A wrapper for results of coroutines"""

View File

@ -7,9 +7,9 @@ from .exceptions import AlreadyJoinedError, CancelledError
from timeit import default_timer from timeit import default_timer
from time import sleep as wait from time import sleep as wait
from .socket import AsyncSocket, WantRead, WantWrite from .socket import AsyncSocket, WantRead, WantWrite
from .traps import join, sleep, want_read, want_write, cancel
from .abstractions import Task, Result from .abstractions import Task, Result
from socket import SOL_SOCKET, SO_ERROR from socket import SOL_SOCKET, SO_ERROR
from .traps import join, sleep, want_read, want_write, cancel
class EventLoop: class EventLoop:
@ -53,7 +53,7 @@ class EventLoop:
self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value self.running.result = Result(e.args[0] if e.args else None, None) # Saves the return value
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
except RuntimeError: except RuntimeError:
self.to_run.extend(self.joined.pop(self.running, ())) self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
self.to_run.append(self.running) self.to_run.append(self.running)
except Exception as has_raised: except Exception as has_raised:
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
@ -116,8 +116,6 @@ class EventLoop:
await want_read(sock) await want_read(sock)
return sock.accept() return sock.accept()
async def sock_sendall(self, sock: socket.socket, data: bytes): async def sock_sendall(self, sock: socket.socket, data: bytes):
"""Sends all the passed data, as bytes, trough the socket asynchronously""" """Sends all the passed data, as bytes, trough the socket asynchronously"""
@ -151,11 +149,11 @@ class EventLoop:
self.to_run.append(self.running) # Reschedule the task that called sleep self.to_run.append(self.running) # Reschedule the task that called sleep
def want_cancel(self, task): def want_cancel(self, task):
self.to_run.extend(self.joined.pop(self.running, ())) # Reschedules the parent task
task.cancelled = True task.cancelled = True
self.to_run.extend(self.joined.pop(self.running, ()))
self.to_run.append(self.running) # Reschedules the parent task
task.throw(CancelledError()) task.throw(CancelledError())
async def connect_sock(self, sock: socket.socket, addr: tuple): async def connect_sock(self, sock: socket.socket, addr: tuple):
try: # "Borrowed" from curio try: # "Borrowed" from curio
result = sock.connect(addr) result = sock.connect(addr)

View File

@ -12,3 +12,6 @@ class CancelledError(GiambioError):
def __repr__(self): def __repr__(self):
return "giambio.exceptions.CancelledError" return "giambio.exceptions.CancelledError"
class TaskCancelled(GiambioError):
"""This exception is raised when the user attempts to join a cancelled task"""

View File

@ -2,8 +2,7 @@
import types import types
import socket import socket
from .abstractions import Task from .exceptions import TaskCancelled
@types.coroutine @types.coroutine
def sleep(seconds: int): def sleep(seconds: int):
@ -35,16 +34,18 @@ def want_write(sock: socket.socket):
@types.coroutine @types.coroutine
def join(task: Task): def join(task):
"""'Tells' the scheduler that the desired task MUST be awaited for completion""" """'Tells' the scheduler that the desired task MUST be awaited for completion"""
if task.cancelled:
raise TaskCancelled("Cannot join cancelled task!")
task.joined = True task.joined = True
yield "want_join", task yield "want_join", task
return task.get_result() # This raises an exception if the child task errored return task.get_result() # This raises an exception if the child task errored
@types.coroutine @types.coroutine
def cancel(task: Task): def cancel(task):
"""'Tells' the scheduler that the passed task must be cancelled""" """'Tells' the scheduler that the passed task must be cancelled"""
yield "want_cancel", task yield "want_cancel", task

View File

@ -3,7 +3,6 @@ from giambio.socket import AsyncSocket
import socket import socket
import logging import logging
loop = giambio.core.EventLoop() loop = giambio.core.EventLoop()
logging.basicConfig(level=20, logging.basicConfig(level=20,