Fixed some bugs with exception propagation and other stuff. I/O is significantly broken on the exceptions' side

This commit is contained in:
nocturn9x 2020-12-19 15:18:12 +01:00
parent 98c9440115
commit 29683f9067
7 changed files with 165 additions and 114 deletions

View File

@ -40,6 +40,7 @@ class TaskManager:
self.timeout = self.started + timeout
else:
self.timeout = None
self.timed_out = False
def spawn(self, func: types.FunctionType, *args):
"""
@ -78,7 +79,6 @@ class TaskManager:
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
async def cancel(self):
"""
@ -88,3 +88,6 @@ class TaskManager:
# TODO: This breaks, somehow, investigation needed
for task in self.tasks:
await task.cancel()
def done(self):
return all([task.done() for task in self.tasks])

View File

@ -33,9 +33,10 @@ from giambio.exceptions import (InternalError,
TooSlowError
)
# TODO: Take into account SSLWantReadError and SSLWantWriteError
IOInterrupt = (BlockingIOError, InterruptedError)
IO_SKIP_LIMIT = 5 # TODO: Inspect this
# TODO: Right now this value is pretty much arbitrary, we need some euristic testing to choose a sensible default
IO_SKIP_LIMIT = 5
class AsyncScheduler:
@ -44,8 +45,8 @@ class AsyncScheduler:
model in its simplicity, without using actual threads, but rather alternating
across coroutines execution to let more than one thing at a time to proceed
with its calculations. An attempt to fix the threaded model has been made
without making the API unnecessarily complicated.
A few examples are tasks cancellation and exception propagation.
without making the API unnecessarily complicated. A few examples are tasks
cancellation and exception propagation.
"""
def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None):
@ -77,7 +78,7 @@ class AsyncScheduler:
self.has_ran = False
# The current pool
self.current_pool = None
# How many times we skipped I/O checks to let a task run
# How many times we skipped I/O checks to let a task run.
# We limit the number of times we skip such checks to avoid
# I/O starvation in highly concurrent systems
self.io_skip = 0
@ -86,7 +87,7 @@ class AsyncScheduler:
def done(self):
"""
Returns True if there is work to do
Returns True if there is no work to do
"""
if any([self.paused, self.tasks, self.events, self.selector.get_map()]):
@ -99,11 +100,12 @@ class AsyncScheduler:
"""
self.selector.close()
# TODO: Anything else?
def run(self):
"""
Starts the loop and 'listens' for events until there is work to do,
then exits. This behavior kinda reflects a kernel, as coroutines can
then exits. This behavior kinda resembles a kernel, as coroutines can
request the loop's functionality only trough some fixed entry points,
which in turn yield and give execution control to the loop itself.
"""
@ -111,33 +113,44 @@ class AsyncScheduler:
while True:
try:
if self.done():
# If we're done, which means there are no
# sleeping tasks, no events to deliver,
# no I/O to do and no running tasks, we
# If we're done, which means there are
# both no paused tasks and no running tasks, we
# simply tear us down and return to self.start
self.close()
break
elif not self.tasks:
# We start by checking for I/O
self.check_io()
# If there are no actively running tasks, we start by checking
# for I/O. This method will wait for I/O until the closest deadline
# to avoid starving sleeping tasks
if self.selector.get_map():
self.check_io()
if self.deadlines:
# Then we start checking for deadlines, if there are any
self.expire_deadlines()
if self.paused:
# Next, if there are no actively running tasks
# we try to schedule the asleep ones
# Next we try to (re)schedule the asleep tasks
self.awake_sleeping()
# Then we try to awake event-waiting tasks
if self.events:
self.check_events()
# Otherwise, while there are tasks ready to run, well, run them!
# Otherwise, while there are tasks ready to run, we run them!
while self.tasks:
# Sets the currently running task
self.current_task = self.tasks.pop(0)
# Sets the current pool (for nested pools)
self.current_pool = self.current_task.pool
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific).
# The deadlines queue will internally make sure not to store
# a deadline for the same pool twice. This makes the timeouts
# model less flexible, because one can't change the timeout
# after it is set, but it makes the implementation easier.
self.deadlines.put(self.current_pool.timeout, self.current_pool)
self.debugger.before_task_step(self.current_task)
if self.current_task.cancel_pending:
# We perform the deferred cancellation
# if it was previously scheduled
self.do_cancel()
self.cancel(self.current_task)
if self.to_send and self.current_task.status != "init":
# A little setup to send objects from and to
# coroutines outside the event loop
@ -145,7 +158,8 @@ class AsyncScheduler:
else:
# The first time coroutines' method .send() wants None!
data = None
# Run a single step with the calculation
# Run a single step with the calculation (i.e. until a yield
# somewhere)
method, *args = self.current_task.run(data)
# Some debugging and internal chatter here
self.current_task.status = "run"
@ -158,27 +172,11 @@ class AsyncScheduler:
getattr(self, method)(*args)
except AttributeError: # If this happens, that's quite bad!
# This exception block is meant to be triggered by other async
# libraries, which most likely have different trap names and behaviors.
# If you get this exception and you're 100% sure you're not mixing
# async primitives from other libraries, then it's a bug!
# libraries, which most likely have different trap names and behaviors
# compared to us. If you get this exception and you're 100% sure you're
# not mixing async primitives from other libraries, then it's a bug!
raise InternalError("Uh oh! Something very bad just happened, did"
" you try to mix primitives from other async libraries?") from None
except CancelledError:
# When a task needs to be cancelled, giambio tries to do it gracefully
# first: if the task is paused in either I/O or sleeping, that's perfect.
# But we also need to cancel a task if it was not sleeping or waiting on
# any I/O because it could never do so (therefore blocking everything
# forever). So, when cancellation can't be done right away, we schedule
# if for the next execution step of the task. Giambio will also make sure
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled, and since asking kindly didn't work we have to use some
# force :)
self.current_task.status = "cancelled"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.debugger.after_cancel(self.current_task)
self.join(self.current_task)
except StopIteration as ret:
# At the end of the day, coroutines are generator functions with
# some tricky behaviors, and this is one of them. When a coroutine
@ -200,16 +198,15 @@ class AsyncScheduler:
self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next
self.current_task.status = "crashed"
self.debugger.on_exception_raised(self.current_task, err)
self.join(self.current_task) # This propagates the exception
self.join(self.current_task)
def do_cancel(self, task: Task = None):
def do_cancel(self, task: Task):
"""
Performs task cancellation by throwing CancelledError inside the given
task in order to stop it from running. The loop continues to execute
as tasks are independent
"""
task = task or self.current_task
if not task.cancelled and not task.exc:
self.debugger.before_cancel(task)
task.throw(CancelledError())
@ -222,6 +219,17 @@ class AsyncScheduler:
self.tasks.append(self.current_task)
self.to_send = self.current_task
def expire_deadlines(self):
"""
Handles expiring deadlines by raising an exception
inside the correct pool if its timeout expired
"""
while self.deadlines and self.deadlines[0][0] <= self.clock():
_, __, pool = self.deadlines.get()
pool.timed_out = True
self.current_task.throw(TooSlowError())
def check_events(self):
"""
Checks for ready or expired events and triggers them
@ -229,6 +237,11 @@ class AsyncScheduler:
for event in self.events.copy():
if event.set:
# When an event is set, all the tasks
# that called wait() on it are waken up.
# Since events can only be triggered once,
# we discard the event object from our
# set after we've rescheduled its waiters.
event.event_caught = True
self.tasks.extend(event.waiters)
self.events.remove(event)
@ -239,17 +252,19 @@ class AsyncScheduler:
has elapsed
"""
while self.paused and self.paused[0][0] < self.clock():
while self.paused and self.paused[0][0] <= self.clock():
# Reschedules tasks when their deadline has elapsed
task = self.paused.get()
slept = self.clock() - task.sleep_start
task.sleep_start = 0.0
self.tasks.append(task)
self.debugger.after_sleep(task, slept)
if not task.done():
slept = self.clock() - task.sleep_start
task.sleep_start = 0.0
self.tasks.append(task)
self.debugger.after_sleep(task, slept)
def check_io(self):
"""
Checks and schedules task to perform I/O
Checks for I/O and implements the sleeping mechanism
for the event loop
"""
before_time = self.clock() # Used for the debugger
@ -265,33 +280,28 @@ class AsyncScheduler:
# If there are either tasks or events and no I/O, don't wait
# (unless we already skipped this check too many times)
timeout = 0.0
elif self.paused:
# If there are asleep tasks, wait until the closest deadline
elif self.paused or self.deadlines:
# If there are asleep tasks or deadlines, wait until the closest date
if not self.deadlines:
# If there are no deadlines just wait until the first task wakeup
timeout = min([max(0.0, self.paused[0][0] - self.clock())])
elif not self.paused:
# If there are no sleeping tasks just wait until the first deadline
timeout = min([max(0.0, self.deadlines[0][0] - self.clock())])
else:
deadline = self.deadlines.get()
timeout = min([max(0.0, self.paused[0][0] - self.clock()), deadline])
if timeout != deadline:
# If a sleeping tasks has to run
# before another deadline, we schedule the former
# first and put back the latter on the queue
self.deadlines.put(deadline)
# If there are both deadlines AND sleeping tasks scheduled we calculate
# the absolute closest deadline among the two sets and use that as a timeout
clock = self.clock()
timeout = min([max(0.0, self.paused[0][0] - clock), self.deadlines[0][0] - clock])
else:
# If there is *only* I/O, we wait a fixed amount of time
timeout = 86400 # Thanks trio :D
if self.selector.get_map():
self.debugger.before_io(timeout)
io_ready = self.selector.select(timeout)
# Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.tasks.append(key.data) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
else:
# Since select() does not work with 0 fds registered
# we need to call time.sleep() if we need to pause
# and no I/O has been registered
wait(timeout)
timeout = 86400 # Thanks trio :D
self.debugger.before_io(timeout)
io_ready = self.selector.select(timeout)
# Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.tasks.append(key.data) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
def start(self, func: types.FunctionType, *args):
"""
@ -338,20 +348,41 @@ class AsyncScheduler:
if pool:
for to_cancel in pool.tasks:
self.cancel(to_cancel)
pool.cancelled = True
return all([t.cancelled or t.finished or t.exc for t in pool.tasks])
# If pool.done() equals True, then self.join() can
# safely proceed and reschedule the parent of the
# current pool. If, however, there are still some
# tasks running, we wait for them to exit in order
# to avoid orphaned tasks
return pool.done()
else: # If we're at the main task, we're sure everything else exited
return True
def get_io_tasks(self):
"""
Return all tasks waiting on I/O resources
"""
return [k.data for k in self.selector.get_map().values()]
def get_all_tasks(self):
"""
Returns all tasks which the loop is currently keeping track of.
This includes both running and paused tasks. A paused task is a
task which is either waiting on an I/O resource, sleeping, or
waiting on an event to be triggered
"""
return chain(self.tasks, self.paused, self.get_event_tasks(), self.get_io_tasks())
def cancel_all(self):
"""
Cancels ALL tasks, this method is called as a result
of self.close()
"""
for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()):
for to_cancel in self.get_all_tasks():
self.cancel(to_cancel)
return all([t.cancelled or t.exc or t.finished for t in chain(self.tasks, self.paused, self.get_event_tasks())])
return all([t.done() for t in self.get_all_tasks()])
def close(self, *, ensure_done: bool = True):
"""
@ -359,7 +390,7 @@ class AsyncScheduler:
inside it and tearing down any extra machinery.
If ensure_done equals False, the loop will cancel *ALL*
running and scheduled tasks and then tear itself down.
If ensure_done equals False, which is the default behavior,
If ensure_done equals True, which is the default behavior,
this method will raise a GiambioError if the loop hasn't
finished running.
"""
@ -377,15 +408,9 @@ class AsyncScheduler:
task.join() on the task object)
"""
if self.current_pool is None:
if not self.done():
return
else:
self.reschedule_joiners(task)
return
task.joined = True
if task.finished or task.cancelled:
if all([t.finished or t.cancelled for t in self.current_pool.tasks]):
if self.current_pool and self.current_pool.done() or not self.current_pool:
self.reschedule_joiners(task)
elif task.exc:
if self.cancel_all_from_current_pool():
@ -406,7 +431,7 @@ class AsyncScheduler:
self.current_task.status = "sleep"
self.current_task.sleep_start = self.clock()
self.paused.put(self.current_task, seconds)
self.current_task.next_deadline = self.clock() + seconds
self.current_task.next_deadline = self.current_task.sleep_start + seconds
else:
self.tasks.append(self.current_task)
@ -416,7 +441,10 @@ class AsyncScheduler:
or does so straight away if it is safe to do so
"""
if task.status in ("io", "sleep", "init"):
if task.done():
# The task isn't running already!
return
elif task.status in ("io", "sleep", "init"):
# We cancel immediately only in a context where it's safer to do
# so. The concept of "safer" is quite tricky, because even though the
# task is technically not running, it might leave some unfinished state
@ -425,11 +453,21 @@ class AsyncScheduler:
try:
self.do_cancel(task)
except CancelledError:
# Task was cancelled
# When a task needs to be cancelled, giambio tries to do it gracefully
# first: if the task is paused in either I/O or sleeping, that's perfect.
# But we also need to cancel a task if it was not sleeping or waiting on
# any I/O because it could never do so (therefore blocking everything
# forever). So, when cancellation can't be done right away, we schedule
# if for the next execution step of the task. Giambio will also make sure
# to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be
# cancelled, and since asking kindly didn't work we have to use some
# force :)
task.status = "cancelled"
task.cancelled = True
task.cancel_pending = False
self.debugger.after_cancel(task)
self.paused.discard(task)
else:
# If we can't cancel in a somewhat "graceful" way, we just
# defer this operation for later (check run() for more info)
@ -534,10 +572,3 @@ class AsyncScheduler:
await want_write(sock)
return sock.connect(addr)
def __del__(self):
"""
Garbage collects itself
"""
self.close()

View File

@ -17,7 +17,7 @@ limitations under the License.
"""
from giambio.traps import join, cancel, event_set, event_wait
from heapq import heappop, heappush
from heapq import heappop, heappush, heapify
from giambio.exceptions import GiambioError
from dataclasses import dataclass, field
import typing
@ -80,6 +80,9 @@ class Task:
def __hash__(self):
return hash(self.coroutine)
def done(self):
return self.exc or self.finished or self.cancelled
class Event:
"""
@ -101,7 +104,7 @@ class Event:
pause() on us
"""
if self.set:
if self.set: # This is set by the event loop internally
raise GiambioError("The event has already been set")
await event_set(self)
@ -132,7 +135,17 @@ class TimeQueue:
self.container = []
def __contains__(self, item):
return item in self.container
for i in self.container:
if i[2] == item:
return True
return False
def discard(self, item):
for i in self.container:
if i[2] == item:
self.container.remove(i)
heapify(self.container)
return
def __iter__(self):
return self
@ -180,6 +193,7 @@ class DeadlinesQueue(TimeQueue):
"""
super().__init__(None)
self.pools = set()
def __contains__(self, item):
return super().__contains__(item)
@ -199,17 +213,21 @@ class DeadlinesQueue(TimeQueue):
def __repr__(self):
return f"DeadlinesQueue({self.container})"
def put(self, amount: float):
def put(self, amount: float, pool):
"""
Pushes a deadline (timeout) onto the queue
Pushes a deadline (timeout) onto the queue with its associated
pool
"""
heappush(self.container, (amount, self.sequence))
self.sequence += 1
if pool not in self.pools:
self.pools.add(pool)
heappush(self.container, (amount, self.sequence, pool))
def get(self):
"""
Gets the first task that is meant to run
"""
return super().get()
d = heappop(self.container)
self.pools.discard(d[2])
return d

View File

@ -94,4 +94,4 @@ def wrap_socket(sock: socket.socket) -> AsyncSocket:
Wraps a standard socket into an async socket
"""
return AsyncSocket(sock)
return AsyncSocket(sock)

View File

@ -59,7 +59,7 @@ async def sleep(seconds: int):
async def current_task():
"""
Gets the currently running task
Gets the currently running task in an asynchronous fashion
"""
return await create_trap("get_current")

View File

@ -6,14 +6,14 @@ async def child():
print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child] Had a nice nap!")
# raise TypeError("rip")
raise TypeError("rip")
async def child1():
print("[child 1] Child spawned!! Sleeping for 4 seconds")
await giambio.sleep(4)
print("[child 1] Child spawned!! Sleeping for 8 seconds")
await giambio.sleep(8)
print("[child 1] Had a nice nap!")
raise TypeError("rip")
# raise TypeError("rip")
async def main():

View File

@ -13,17 +13,16 @@ async def serve(address: tuple):
sock.bind(address)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(5)
asock = giambio.wrap_socket(sock) # We make the socket an async socket
async_sock = giambio.wrap_socket(sock) # We make the socket an async socket
logging.info(f"Serving asynchronously at {address[0]}:{address[1]}")
async with giambio.create_pool() as pool:
while True:
conn, addr = await asock.accept()
logging.info(f"{addr[0]}:{addr[1]} connected")
pool.spawn(handler, conn, addr)
conn, address_tuple = await async_sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
pool.spawn(handler, conn, address_tuple)
async def handler(sock: AsyncSocket, addr: tuple):
addr = f"{addr[0]}:{addr[1]}"
address = f"{addr[0]}:{addr[1]}"
async with sock:
await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n")
while True:
@ -36,17 +35,17 @@ async def handler(sock: AsyncSocket, addr: tuple):
raise TypeError("Oh, no, I'm gonna die!")
to_send_back = data
data = data.decode("utf-8").encode("unicode_escape")
logging.info(f"Got: '{data.decode('utf-8')}' from {addr}")
logging.info(f"Got: '{data.decode('utf-8')}' from {address}")
await sock.send_all(b"Got: " + to_send_back)
logging.info(f"Echoed back '{data.decode('utf-8')}' to {addr}")
logging.info(f"Connection from {addr} closed")
logging.info(f"Echoed back '{data.decode('utf-8')}' to {address}")
logging.info(f"Connection from {address} closed")
if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500
logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p")
try:
giambio.run(serve, ("localhost", port), debugger=None)
giambio.run(serve, ("localhost", port), debugger=Debugger())
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting")