diff --git a/giambio/context.py b/giambio/context.py index a5a3d3a..dd195d0 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -40,6 +40,7 @@ class TaskManager: self.timeout = self.started + timeout else: self.timeout = None + self.timed_out = False def spawn(self, func: types.FunctionType, *args): """ @@ -78,7 +79,6 @@ class TaskManager: # end of the block and wait for all # children to exit await task.join() - self.tasks.remove(task) async def cancel(self): """ @@ -88,3 +88,6 @@ class TaskManager: # TODO: This breaks, somehow, investigation needed for task in self.tasks: await task.cancel() + + def done(self): + return all([task.done() for task in self.tasks]) diff --git a/giambio/core.py b/giambio/core.py index 979b4a1..d429895 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -33,9 +33,10 @@ from giambio.exceptions import (InternalError, TooSlowError ) - +# TODO: Take into account SSLWantReadError and SSLWantWriteError IOInterrupt = (BlockingIOError, InterruptedError) -IO_SKIP_LIMIT = 5 # TODO: Inspect this +# TODO: Right now this value is pretty much arbitrary, we need some euristic testing to choose a sensible default +IO_SKIP_LIMIT = 5 class AsyncScheduler: @@ -44,8 +45,8 @@ class AsyncScheduler: model in its simplicity, without using actual threads, but rather alternating across coroutines execution to let more than one thing at a time to proceed with its calculations. An attempt to fix the threaded model has been made - without making the API unnecessarily complicated. - A few examples are tasks cancellation and exception propagation. + without making the API unnecessarily complicated. A few examples are tasks + cancellation and exception propagation. """ def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None): @@ -77,7 +78,7 @@ class AsyncScheduler: self.has_ran = False # The current pool self.current_pool = None - # How many times we skipped I/O checks to let a task run + # How many times we skipped I/O checks to let a task run. # We limit the number of times we skip such checks to avoid # I/O starvation in highly concurrent systems self.io_skip = 0 @@ -86,7 +87,7 @@ class AsyncScheduler: def done(self): """ - Returns True if there is work to do + Returns True if there is no work to do """ if any([self.paused, self.tasks, self.events, self.selector.get_map()]): @@ -99,11 +100,12 @@ class AsyncScheduler: """ self.selector.close() + # TODO: Anything else? def run(self): """ Starts the loop and 'listens' for events until there is work to do, - then exits. This behavior kinda reflects a kernel, as coroutines can + then exits. This behavior kinda resembles a kernel, as coroutines can request the loop's functionality only trough some fixed entry points, which in turn yield and give execution control to the loop itself. """ @@ -111,33 +113,44 @@ class AsyncScheduler: while True: try: if self.done(): - # If we're done, which means there are no - # sleeping tasks, no events to deliver, - # no I/O to do and no running tasks, we + # If we're done, which means there are + # both no paused tasks and no running tasks, we # simply tear us down and return to self.start self.close() break elif not self.tasks: - # We start by checking for I/O - self.check_io() + # If there are no actively running tasks, we start by checking + # for I/O. This method will wait for I/O until the closest deadline + # to avoid starving sleeping tasks + if self.selector.get_map(): + self.check_io() + if self.deadlines: + # Then we start checking for deadlines, if there are any + self.expire_deadlines() if self.paused: - # Next, if there are no actively running tasks - # we try to schedule the asleep ones + # Next we try to (re)schedule the asleep tasks self.awake_sleeping() # Then we try to awake event-waiting tasks if self.events: self.check_events() - # Otherwise, while there are tasks ready to run, well, run them! + # Otherwise, while there are tasks ready to run, we run them! while self.tasks: # Sets the currently running task self.current_task = self.tasks.pop(0) # Sets the current pool (for nested pools) self.current_pool = self.current_task.pool + if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: + # Stores deadlines for tasks (deadlines are pool-specific). + # The deadlines queue will internally make sure not to store + # a deadline for the same pool twice. This makes the timeouts + # model less flexible, because one can't change the timeout + # after it is set, but it makes the implementation easier. + self.deadlines.put(self.current_pool.timeout, self.current_pool) self.debugger.before_task_step(self.current_task) if self.current_task.cancel_pending: # We perform the deferred cancellation # if it was previously scheduled - self.do_cancel() + self.cancel(self.current_task) if self.to_send and self.current_task.status != "init": # A little setup to send objects from and to # coroutines outside the event loop @@ -145,7 +158,8 @@ class AsyncScheduler: else: # The first time coroutines' method .send() wants None! data = None - # Run a single step with the calculation + # Run a single step with the calculation (i.e. until a yield + # somewhere) method, *args = self.current_task.run(data) # Some debugging and internal chatter here self.current_task.status = "run" @@ -158,27 +172,11 @@ class AsyncScheduler: getattr(self, method)(*args) except AttributeError: # If this happens, that's quite bad! # This exception block is meant to be triggered by other async - # libraries, which most likely have different trap names and behaviors. - # If you get this exception and you're 100% sure you're not mixing - # async primitives from other libraries, then it's a bug! + # libraries, which most likely have different trap names and behaviors + # compared to us. If you get this exception and you're 100% sure you're + # not mixing async primitives from other libraries, then it's a bug! raise InternalError("Uh oh! Something very bad just happened, did" " you try to mix primitives from other async libraries?") from None - except CancelledError: - # When a task needs to be cancelled, giambio tries to do it gracefully - # first: if the task is paused in either I/O or sleeping, that's perfect. - # But we also need to cancel a task if it was not sleeping or waiting on - # any I/O because it could never do so (therefore blocking everything - # forever). So, when cancellation can't be done right away, we schedule - # if for the next execution step of the task. Giambio will also make sure - # to re-raise cancellations at every checkpoint until the task lets the - # exception propagate into us, because we *really* want the task to be - # cancelled, and since asking kindly didn't work we have to use some - # force :) - self.current_task.status = "cancelled" - self.current_task.cancelled = True - self.current_task.cancel_pending = False - self.debugger.after_cancel(self.current_task) - self.join(self.current_task) except StopIteration as ret: # At the end of the day, coroutines are generator functions with # some tricky behaviors, and this is one of them. When a coroutine @@ -200,16 +198,15 @@ class AsyncScheduler: self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next self.current_task.status = "crashed" self.debugger.on_exception_raised(self.current_task, err) - self.join(self.current_task) # This propagates the exception + self.join(self.current_task) - def do_cancel(self, task: Task = None): + def do_cancel(self, task: Task): """ Performs task cancellation by throwing CancelledError inside the given task in order to stop it from running. The loop continues to execute as tasks are independent """ - task = task or self.current_task if not task.cancelled and not task.exc: self.debugger.before_cancel(task) task.throw(CancelledError()) @@ -222,6 +219,17 @@ class AsyncScheduler: self.tasks.append(self.current_task) self.to_send = self.current_task + def expire_deadlines(self): + """ + Handles expiring deadlines by raising an exception + inside the correct pool if its timeout expired + """ + + while self.deadlines and self.deadlines[0][0] <= self.clock(): + _, __, pool = self.deadlines.get() + pool.timed_out = True + self.current_task.throw(TooSlowError()) + def check_events(self): """ Checks for ready or expired events and triggers them @@ -229,6 +237,11 @@ class AsyncScheduler: for event in self.events.copy(): if event.set: + # When an event is set, all the tasks + # that called wait() on it are waken up. + # Since events can only be triggered once, + # we discard the event object from our + # set after we've rescheduled its waiters. event.event_caught = True self.tasks.extend(event.waiters) self.events.remove(event) @@ -239,17 +252,19 @@ class AsyncScheduler: has elapsed """ - while self.paused and self.paused[0][0] < self.clock(): + while self.paused and self.paused[0][0] <= self.clock(): # Reschedules tasks when their deadline has elapsed task = self.paused.get() - slept = self.clock() - task.sleep_start - task.sleep_start = 0.0 - self.tasks.append(task) - self.debugger.after_sleep(task, slept) + if not task.done(): + slept = self.clock() - task.sleep_start + task.sleep_start = 0.0 + self.tasks.append(task) + self.debugger.after_sleep(task, slept) def check_io(self): """ - Checks and schedules task to perform I/O + Checks for I/O and implements the sleeping mechanism + for the event loop """ before_time = self.clock() # Used for the debugger @@ -265,33 +280,28 @@ class AsyncScheduler: # If there are either tasks or events and no I/O, don't wait # (unless we already skipped this check too many times) timeout = 0.0 - elif self.paused: - # If there are asleep tasks, wait until the closest deadline + elif self.paused or self.deadlines: + # If there are asleep tasks or deadlines, wait until the closest date if not self.deadlines: + # If there are no deadlines just wait until the first task wakeup timeout = min([max(0.0, self.paused[0][0] - self.clock())]) + elif not self.paused: + # If there are no sleeping tasks just wait until the first deadline + timeout = min([max(0.0, self.deadlines[0][0] - self.clock())]) else: - deadline = self.deadlines.get() - timeout = min([max(0.0, self.paused[0][0] - self.clock()), deadline]) - if timeout != deadline: - # If a sleeping tasks has to run - # before another deadline, we schedule the former - # first and put back the latter on the queue - self.deadlines.put(deadline) + # If there are both deadlines AND sleeping tasks scheduled we calculate + # the absolute closest deadline among the two sets and use that as a timeout + clock = self.clock() + timeout = min([max(0.0, self.paused[0][0] - clock), self.deadlines[0][0] - clock]) else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 86400 # Thanks trio :D - if self.selector.get_map(): - self.debugger.before_io(timeout) - io_ready = self.selector.select(timeout) - # Get sockets that are ready and schedule their tasks - for key, _ in io_ready: - self.tasks.append(key.data) # Resource ready? Schedule its task - self.debugger.after_io(self.clock() - before_time) - else: - # Since select() does not work with 0 fds registered - # we need to call time.sleep() if we need to pause - # and no I/O has been registered - wait(timeout) + timeout = 86400 # Thanks trio :D + self.debugger.before_io(timeout) + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.tasks.append(key.data) # Resource ready? Schedule its task + self.debugger.after_io(self.clock() - before_time) def start(self, func: types.FunctionType, *args): """ @@ -338,20 +348,41 @@ class AsyncScheduler: if pool: for to_cancel in pool.tasks: self.cancel(to_cancel) - pool.cancelled = True - return all([t.cancelled or t.finished or t.exc for t in pool.tasks]) + # If pool.done() equals True, then self.join() can + # safely proceed and reschedule the parent of the + # current pool. If, however, there are still some + # tasks running, we wait for them to exit in order + # to avoid orphaned tasks + return pool.done() else: # If we're at the main task, we're sure everything else exited return True + def get_io_tasks(self): + """ + Return all tasks waiting on I/O resources + """ + + return [k.data for k in self.selector.get_map().values()] + + def get_all_tasks(self): + """ + Returns all tasks which the loop is currently keeping track of. + This includes both running and paused tasks. A paused task is a + task which is either waiting on an I/O resource, sleeping, or + waiting on an event to be triggered + """ + + return chain(self.tasks, self.paused, self.get_event_tasks(), self.get_io_tasks()) + def cancel_all(self): """ Cancels ALL tasks, this method is called as a result of self.close() """ - for to_cancel in chain(self.tasks, self.paused, self.get_event_tasks()): + for to_cancel in self.get_all_tasks(): self.cancel(to_cancel) - return all([t.cancelled or t.exc or t.finished for t in chain(self.tasks, self.paused, self.get_event_tasks())]) + return all([t.done() for t in self.get_all_tasks()]) def close(self, *, ensure_done: bool = True): """ @@ -359,7 +390,7 @@ class AsyncScheduler: inside it and tearing down any extra machinery. If ensure_done equals False, the loop will cancel *ALL* running and scheduled tasks and then tear itself down. - If ensure_done equals False, which is the default behavior, + If ensure_done equals True, which is the default behavior, this method will raise a GiambioError if the loop hasn't finished running. """ @@ -377,15 +408,9 @@ class AsyncScheduler: task.join() on the task object) """ - if self.current_pool is None: - if not self.done(): - return - else: - self.reschedule_joiners(task) - return task.joined = True if task.finished or task.cancelled: - if all([t.finished or t.cancelled for t in self.current_pool.tasks]): + if self.current_pool and self.current_pool.done() or not self.current_pool: self.reschedule_joiners(task) elif task.exc: if self.cancel_all_from_current_pool(): @@ -406,7 +431,7 @@ class AsyncScheduler: self.current_task.status = "sleep" self.current_task.sleep_start = self.clock() self.paused.put(self.current_task, seconds) - self.current_task.next_deadline = self.clock() + seconds + self.current_task.next_deadline = self.current_task.sleep_start + seconds else: self.tasks.append(self.current_task) @@ -416,7 +441,10 @@ class AsyncScheduler: or does so straight away if it is safe to do so """ - if task.status in ("io", "sleep", "init"): + if task.done(): + # The task isn't running already! + return + elif task.status in ("io", "sleep", "init"): # We cancel immediately only in a context where it's safer to do # so. The concept of "safer" is quite tricky, because even though the # task is technically not running, it might leave some unfinished state @@ -425,11 +453,21 @@ class AsyncScheduler: try: self.do_cancel(task) except CancelledError: - # Task was cancelled + # When a task needs to be cancelled, giambio tries to do it gracefully + # first: if the task is paused in either I/O or sleeping, that's perfect. + # But we also need to cancel a task if it was not sleeping or waiting on + # any I/O because it could never do so (therefore blocking everything + # forever). So, when cancellation can't be done right away, we schedule + # if for the next execution step of the task. Giambio will also make sure + # to re-raise cancellations at every checkpoint until the task lets the + # exception propagate into us, because we *really* want the task to be + # cancelled, and since asking kindly didn't work we have to use some + # force :) task.status = "cancelled" task.cancelled = True task.cancel_pending = False self.debugger.after_cancel(task) + self.paused.discard(task) else: # If we can't cancel in a somewhat "graceful" way, we just # defer this operation for later (check run() for more info) @@ -534,10 +572,3 @@ class AsyncScheduler: await want_write(sock) return sock.connect(addr) - - def __del__(self): - """ - Garbage collects itself - """ - - self.close() diff --git a/giambio/objects.py b/giambio/objects.py index 8d57e5e..c43c77e 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -17,7 +17,7 @@ limitations under the License. """ from giambio.traps import join, cancel, event_set, event_wait -from heapq import heappop, heappush +from heapq import heappop, heappush, heapify from giambio.exceptions import GiambioError from dataclasses import dataclass, field import typing @@ -80,6 +80,9 @@ class Task: def __hash__(self): return hash(self.coroutine) + def done(self): + return self.exc or self.finished or self.cancelled + class Event: """ @@ -101,7 +104,7 @@ class Event: pause() on us """ - if self.set: + if self.set: # This is set by the event loop internally raise GiambioError("The event has already been set") await event_set(self) @@ -132,7 +135,17 @@ class TimeQueue: self.container = [] def __contains__(self, item): - return item in self.container + for i in self.container: + if i[2] == item: + return True + return False + + def discard(self, item): + for i in self.container: + if i[2] == item: + self.container.remove(i) + heapify(self.container) + return def __iter__(self): return self @@ -180,6 +193,7 @@ class DeadlinesQueue(TimeQueue): """ super().__init__(None) + self.pools = set() def __contains__(self, item): return super().__contains__(item) @@ -199,17 +213,21 @@ class DeadlinesQueue(TimeQueue): def __repr__(self): return f"DeadlinesQueue({self.container})" - def put(self, amount: float): + def put(self, amount: float, pool): """ - Pushes a deadline (timeout) onto the queue + Pushes a deadline (timeout) onto the queue with its associated + pool """ - heappush(self.container, (amount, self.sequence)) - self.sequence += 1 + if pool not in self.pools: + self.pools.add(pool) + heappush(self.container, (amount, self.sequence, pool)) def get(self): """ Gets the first task that is meant to run """ - return super().get() + d = heappop(self.container) + self.pools.discard(d[2]) + return d \ No newline at end of file diff --git a/giambio/socket.py b/giambio/socket.py index a01261c..bc0c6df 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -94,4 +94,4 @@ def wrap_socket(sock: socket.socket) -> AsyncSocket: Wraps a standard socket into an async socket """ - return AsyncSocket(sock) \ No newline at end of file + return AsyncSocket(sock) diff --git a/giambio/traps.py b/giambio/traps.py index fa4672a..0c40e7a 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -59,7 +59,7 @@ async def sleep(seconds: int): async def current_task(): """ - Gets the currently running task + Gets the currently running task in an asynchronous fashion """ return await create_trap("get_current") diff --git a/tests/exceptions.py b/tests/exceptions.py index b0d480d..bd773a2 100644 --- a/tests/exceptions.py +++ b/tests/exceptions.py @@ -6,14 +6,14 @@ async def child(): print("[child] Child spawned!! Sleeping for 2 seconds") await giambio.sleep(2) print("[child] Had a nice nap!") - # raise TypeError("rip") + raise TypeError("rip") async def child1(): - print("[child 1] Child spawned!! Sleeping for 4 seconds") - await giambio.sleep(4) + print("[child 1] Child spawned!! Sleeping for 8 seconds") + await giambio.sleep(8) print("[child 1] Had a nice nap!") - raise TypeError("rip") + # raise TypeError("rip") async def main(): diff --git a/tests/server.py b/tests/server.py index 3b17598..0a193a9 100644 --- a/tests/server.py +++ b/tests/server.py @@ -13,17 +13,16 @@ async def serve(address: tuple): sock.bind(address) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.listen(5) - asock = giambio.wrap_socket(sock) # We make the socket an async socket + async_sock = giambio.wrap_socket(sock) # We make the socket an async socket logging.info(f"Serving asynchronously at {address[0]}:{address[1]}") async with giambio.create_pool() as pool: - while True: - conn, addr = await asock.accept() - logging.info(f"{addr[0]}:{addr[1]} connected") - pool.spawn(handler, conn, addr) + conn, address_tuple = await async_sock.accept() + logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") + pool.spawn(handler, conn, address_tuple) async def handler(sock: AsyncSocket, addr: tuple): - addr = f"{addr[0]}:{addr[1]}" + address = f"{addr[0]}:{addr[1]}" async with sock: await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n") while True: @@ -36,17 +35,17 @@ async def handler(sock: AsyncSocket, addr: tuple): raise TypeError("Oh, no, I'm gonna die!") to_send_back = data data = data.decode("utf-8").encode("unicode_escape") - logging.info(f"Got: '{data.decode('utf-8')}' from {addr}") + logging.info(f"Got: '{data.decode('utf-8')}' from {address}") await sock.send_all(b"Got: " + to_send_back) - logging.info(f"Echoed back '{data.decode('utf-8')}' to {addr}") - logging.info(f"Connection from {addr} closed") + logging.info(f"Echoed back '{data.decode('utf-8')}' to {address}") + logging.info(f"Connection from {address} closed") if __name__ == "__main__": port = int(sys.argv[1]) if len(sys.argv) > 1 else 1500 logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p") try: - giambio.run(serve, ("localhost", port), debugger=None) + giambio.run(serve, ("localhost", port), debugger=Debugger()) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting")