diff --git a/giambio/context.py b/giambio/context.py index 89450ef..d7b00a9 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -18,8 +18,8 @@ limitations under the License. import types -from .core import AsyncScheduler from .objects import Task +from .core import AsyncScheduler class TaskManager: @@ -40,7 +40,7 @@ class TaskManager: Spawns a child task """ - task = Task(func(*args), func.__name__ or str(func)) + task = Task(func(*args), func.__name__ or str(func), self) task.joiners = [self.loop.current_task] self.loop.tasks.append(task) self.loop.debugger.on_task_spawn(task) @@ -53,7 +53,7 @@ class TaskManager: """ assert n >= 0, "The time delay can't be negative" - task = Task(func(*args), func.__name__ or str(func)) + task = Task(func(*args), func.__name__ or str(func), self) task.joiners = [self.loop.current_task] task.sleep_start = self.loop.clock() self.loop.paused.put(task, n) diff --git a/giambio/core.py b/giambio/core.py index 6ba735b..9a12142 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -71,6 +71,8 @@ class AsyncScheduler: self.to_send = None # Have we ever ran? self.has_ran = False + # The current pool + self.current_pool = None def done(self): """ @@ -100,7 +102,7 @@ class AsyncScheduler: while True: try: if self.done(): - # If we're done, which means there is no + # If we're done, which means there is no # sleeping tasks, no events to deliver, # no I/O to do and no running tasks, we # simply tear us down and return to self.start @@ -111,14 +113,17 @@ class AsyncScheduler: # we try to schedule the asleep ones if self.paused: self.awake_sleeping() - # The next step is checking for I/O - self.check_io() + if self.selector.get_map(): + # The next step is checking for I/O + self.check_io() # Try to awake event-waiting tasks - self.check_events() + if self.events: + self.check_events() # Otherwise, while there are tasks ready to run, well, run them! while self.tasks: # Sets the currently running task self.current_task = self.tasks.pop(0) + self.current_pool = self.current_task.pool self.debugger.before_task_step(self.current_task) if self.current_task.cancel_pending: # We perform the deferred cancellation @@ -151,7 +156,6 @@ class AsyncScheduler: self.current_task.cancel_pending = False self.debugger.after_cancel(self.current_task) self.join(self.current_task) - # TODO: Do we need to join? except StopIteration as ret: # Coroutine ends self.current_task.status = "end" @@ -163,21 +167,22 @@ class AsyncScheduler: # Coroutine raised self.current_task.exc = err self.current_task.status = "crashed" + self.debugger.on_exception_raised(self.current_task, err) self.join(self.current_task) # This propagates the exception def do_cancel(self, task: Task = None): """ - Performs task cancellation by throwing CancelledError inside the current + Performs task cancellation by throwing CancelledError inside the given task in order to stop it from running. The loop continues to execute as tasks are independent """ task = task or self.current_task - if not task.cancelled: + if not task.cancelled and not task.exc: self.debugger.before_cancel(task) task.throw(CancelledError()) - def get_running(self): + def get_current(self): """ Returns the current task to an async caller """ @@ -218,8 +223,8 @@ class AsyncScheduler: Checks and schedules task to perform I/O """ - before_time = self.clock() - if self.tasks or self.events and not self.selector.get_map(): + before_time = self.clock() # Used for the debugger + if self.tasks or self.events: # If there are either tasks or events and no I/O, never wait timeout = 0.0 elif self.paused: @@ -227,13 +232,12 @@ class AsyncScheduler: timeout = max(0.0, self.paused[0][0] - self.clock()) else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 1 + timeout = 1.0 self.debugger.before_io(timeout) - if self.selector.get_map(): - io_ready = self.selector.select(timeout) - # Get sockets that are ready and schedule their tasks - for key, _ in io_ready: - self.tasks.append(key.data) # Resource ready? Schedule its task + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.tasks.append(key.data) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) def start(self, func: types.FunctionType, *args): @@ -241,7 +245,7 @@ class AsyncScheduler: Starts the event loop from a sync context """ - entry = Task(func(*args), func.__name__ or str(func)) + entry = Task(func(*args), func.__name__ or str(func), None) self.tasks.append(entry) self.debugger.on_start() self.run() @@ -267,32 +271,49 @@ class AsyncScheduler: def cancel_all(self): """ - Cancels all tasks in preparation for the exception - throwing from self.join + Cancels all tasks in the current pool, + preparing for the exception throwing + from self.join """ + to_reschedule = [] for to_cancel in chain(self.tasks, self.paused): try: - self.cancel(to_cancel) + if to_cancel.pool is self.current_pool: + self.cancel(to_cancel) + elif to_cancel.status == "sleep": + deadline = to_cancel.next_deadline - self.clock() + to_reschedule.append((to_cancel, deadline)) + else: + to_reschedule.append((to_cancel, None)) except CancelledError: to_cancel.status = "cancelled" to_cancel.cancelled = True to_cancel.cancel_pending = False self.debugger.after_cancel(to_cancel) self.tasks.remove(to_cancel) + for task, deadline in to_reschedule: + if deadline is not None: + self.paused.put(task, deadline) + else: + self.tasks.append(task) + # If there is other work to do (nested pools) + # we tell so to our caller + return bool(to_reschedule) def join(self, task: Task): """ - Handler for the 'join' event, does some magic to tell the scheduler - to wait until the current coroutine ends + Joins a task to its callers (implicitly, the parent + task, but also every other task who called await + task.join() on the task object) """ task.joined = True if task.finished or task.cancelled: self.reschedule_joinee(task) elif task.exc: - self.cancel_all() - self.reschedule_joinee(task) + if not self.cancel_all(): + self.reschedule_joinee(task) def sleep(self, seconds: int or float): """ @@ -300,16 +321,17 @@ class AsyncScheduler: """ self.debugger.before_sleep(self.current_task, seconds) - if seconds: + if seconds: # if seconds == 0, this acts as a switch! self.current_task.status = "sleep" self.current_task.sleep_start = self.clock() self.paused.put(self.current_task, seconds) + self.current_task.next_deadline = self.clock() + seconds else: self.tasks.append(self.current_task) def cancel(self, task: Task = None): """ - Handler for the 'cancel' event, schedules the task to be cancelled later + Schedules the task to be cancelled later or does so straight away if it is safe to do so """ @@ -336,44 +358,30 @@ class AsyncScheduler: """ event.waiters.append(self.current_task) + # Since we don't reschedule the task, it will + # not execute until check_events is called # TODO: More generic I/O rather than just sockets - def want_read(self, sock: socket.socket): + # Best way to do so? Probably threads + def read_or_write(self, sock: socket.socket, evt_type: str): """ - Handler for the 'want_read' event, registers the socket inside the + Registers the given socket inside the selector to perform I/0 multiplexing """ self.current_task.status = "io" if self.current_task.last_io: - if self.current_task.last_io == ("READ", sock): - # Socket is already scheduled! - return - self.selector.unregister(sock) - self.current_task.last_io = "READ", sock - try: - self.selector.register(sock, EVENT_READ, self.current_task) - except KeyError: - # The socket is already registered doing something else - raise ResourceBusy("The given resource is busy!") from None - - def want_write(self, sock: socket.socket): - """ - Handler for the 'want_write' event, registers the socket inside the - selector to perform I/0 multiplexing - """ - - self.current_task.status = "io" - if self.current_task.last_io: - if self.current_task.last_io == ("WRITE", sock): + if self.current_task.last_io == (evt_type, sock): # Socket is already scheduled! return # TODO: Inspect why modify() causes issues self.selector.unregister(sock) - self.current_task.last_io = "WRITE", sock + self.current_task.last_io = evt_type, sock + evt = EVENT_READ if evt_type == "read" else EVENT_WRITE try: - self.selector.register(sock, EVENT_WRITE, self.current_task) + self.selector.register(sock, evt, self.current_task) except KeyError: + # The socket is already registered doing something else raise ResourceBusy("The given resource is busy!") from None def wrap_socket(self, sock): diff --git a/giambio/objects.py b/giambio/objects.py index dee93d0..d182d95 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -32,6 +32,7 @@ class Task: coroutine: types.CoroutineType name: str + pool: "giambio.context.TaskManager" cancelled: bool = False exc: BaseException = None result: object = None @@ -43,6 +44,7 @@ class Task: joined: bool = False cancel_pending: bool = False sleep_start: float = 0.0 + next_deadline: float = 0.0 def run(self, what=None): """ @@ -164,4 +166,4 @@ class TimeQueue: Gets the first task that is meant to run """ - return heappop(self.container)[2] \ No newline at end of file + return heappop(self.container)[2] diff --git a/giambio/traps.py b/giambio/traps.py index e601205..ba08e8a 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -62,7 +62,7 @@ async def current_task(): Gets the currently running task """ - return await create_trap("get_running") + return await create_trap("get_current") async def join(task): @@ -90,7 +90,7 @@ async def cancel(task): """ await create_trap("cancel", task) - assert task.cancelled, f"Coroutine ignored CancelledError" + assert task.cancelled, f"Task ignored CancelledError" async def want_read(stream): @@ -101,7 +101,7 @@ async def want_read(stream): :param stream: The resource that needs to be read """ - await create_trap("want_read", stream) + await create_trap("read_or_write", stream, "read") async def want_write(stream): @@ -112,7 +112,7 @@ async def want_write(stream): :param stream: The resource that needs to be written """ - await create_trap("want_write", stream) + await create_trap("read_or_write", stream, "write") async def event_set(event): diff --git a/giambio/util/debug.py b/giambio/util/debug.py index 188e28d..c38c785 100644 --- a/giambio/util/debug.py +++ b/giambio/util/debug.py @@ -17,7 +17,6 @@ limitations under the License. """ from abc import ABC, abstractmethod from giambio.objects import Task -from typing import Union class BaseDebugger(ABC): @@ -44,7 +43,7 @@ class BaseDebugger(ABC): raise NotImplementedError @abstractmethod - def on_task_schedule(self, task: Task, delay: Union[int, float]): + def on_task_schedule(self, task: Task, delay: float): """ This method is called when a new task is scheduled (not spawned) @@ -54,7 +53,7 @@ class BaseDebugger(ABC): :type task: :class: giambio.objects.Task :param delay: The delay, in seconds, after which the task will start executing - :type delay: int + :type delay: float """ raise NotImplementedError @@ -111,7 +110,7 @@ class BaseDebugger(ABC): raise NotImplementedError @abstractmethod - def before_sleep(self, task: Task, seconds: Union[int, float]): + def before_sleep(self, task: Task, seconds: float): """ This method is called before a task goes to sleep @@ -127,7 +126,7 @@ class BaseDebugger(ABC): raise NotImplementedError @abstractmethod - def after_sleep(self, task: Task, seconds: Union[int, float]): + def after_sleep(self, task: Task, seconds: float): """ This method is called after a tasks awakes from sleeping @@ -137,13 +136,13 @@ class BaseDebugger(ABC): :type task: :class: giambio.objects.Task :param seconds: The amount of seconds the task actually slept - :type seconds: int + :type seconds: float """ raise NotImplementedError @abstractmethod - def before_io(self, timeout: Union[int, float]): + def before_io(self, timeout: float): """ This method is called right before the event loop checks for I/O events @@ -151,13 +150,13 @@ class BaseDebugger(ABC): :param timeout: The max. amount of seconds that the loop will hang when using the select() system call - :type timeout: int + :type timeout: float """ raise NotImplementedError @abstractmethod - def after_io(self, timeout: Union[int, float]): + def after_io(self, timeout: float): """ This method is called right after the event loop has checked for I/O events @@ -165,7 +164,7 @@ class BaseDebugger(ABC): :param timeout: The actual amount of seconds that the loop has hung when using the select() system call - :type timeout: int + :type timeout: float """ raise NotImplementedError @@ -196,3 +195,18 @@ class BaseDebugger(ABC): raise NotImplementedError + @abstractmethod + def on_exception_raised(self, task: Task, exc: BaseException): + """ + This method is called right after a task + has raised an exception + + :param task: The Task object representing a + giambio Task and wrapping a coroutine + :type task: :class: giambio.objects.Task + :param exc: The exception that was raised + :type exc: BaseException + """ + + raise NotImplementedError + diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cancel.py b/tests/cancel.py new file mode 100644 index 0000000..c87cba9 --- /dev/null +++ b/tests/cancel.py @@ -0,0 +1,28 @@ +import giambio +from debugger import Debugger + + +async def child(): + print("[child] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child] Had a nice nap!") + + +async def child1(): + print("[child 1] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child 1] Had a nice nap!") + + +async def main(): + start = giambio.clock() + async with giambio.create_pool() as pool: + pool.spawn(child) + task = pool.spawn(child1) + await task.cancel() + print("[main] Children spawned, awaiting completion") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main) diff --git a/tests/debugger.py b/tests/debugger.py new file mode 100644 index 0000000..7447fc3 --- /dev/null +++ b/tests/debugger.py @@ -0,0 +1,50 @@ +import giambio + + +class Debugger(giambio.debug.BaseDebugger): + """ + A simple debugger for giambio + """ + + def on_start(self): + print("## Started running") + + def on_exit(self): + print("## Finished running") + + def on_task_schedule(self, task, delay: int): + print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") + + def on_task_spawn(self, task): + print(f">> A task named '{task.name}' was spawned") + + def on_task_exit(self, task): + print(f"<< Task '{task.name}' exited") + + def before_task_step(self, task): + print(f"-> About to run a step for '{task.name}'") + + def after_task_step(self, task): + print(f"<- Ran a step for '{task.name}'") + + def before_sleep(self, task, seconds): + print(f"# About to put '{task.name}' to sleep for {seconds:.2f} seconds") + + def after_sleep(self, task, seconds): + print(f"# Task '{task.name}' slept for {seconds:.2f} seconds") + + def before_io(self, timeout): + print(f"!! About to check for I/O for up to {timeout:.2f} seconds") + + def after_io(self, timeout): + print(f"!! Done I/O check (waited for {timeout:.2f} seconds)") + + def before_cancel(self, task): + print(f"// About to cancel '{task.name}'") + + def after_cancel(self, task): + print(f"// Cancelled '{task.name}'") + + def on_exception_raised(self, task, exc): + print(f"== '{task.name}' raised {repr(exc)}") + diff --git a/tests/exceptions.py b/tests/exceptions.py new file mode 100644 index 0000000..ac4cb39 --- /dev/null +++ b/tests/exceptions.py @@ -0,0 +1,32 @@ +import giambio +from debugger import Debugger + + +async def child(): + print("[child] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child] Had a nice nap!") + raise TypeError("rip") # Watch the exception magically propagate! + + +async def child1(): + print("[child 1] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child 1] Had a nice nap!") + + +async def main(): + start = giambio.clock() + try: + async with giambio.create_pool() as pool: + pool.spawn(child) + pool.spawn(child1) + print("[main] Children spawned, awaiting completion") + except Exception as error: + # Because exceptions just *work*! + print(f"[main] Exception from child caught! {repr(error)}") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=Debugger()) diff --git a/tests/nested_exception.py b/tests/nested_exception.py new file mode 100644 index 0000000..74417b6 --- /dev/null +++ b/tests/nested_exception.py @@ -0,0 +1,42 @@ +import giambio +from debugger import Debugger + + + +async def child(): + print("[child] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child] Had a nice nap!") + raise TypeError("rip") # Watch the exception magically propagate! + + +async def child1(): + print("[child 1] Child spawned!! Sleeping for 2 seconds") + await giambio.sleep(2) + print("[child 1] Had a nice nap!") + + +async def child2(): + print("[child 2] Child spawned!! Sleeping for 4 seconds") + await giambio.sleep(4) + print("[child 2] Had a nice nap!") + + +async def main(): + start = giambio.clock() + try: + async with giambio.create_pool() as pool: + pool.spawn(child) + pool.spawn(child1) + print("[main] Children spawned, awaiting completion") + async with giambio.create_pool() as new_pool: + new_pool.spawn(child2) + print("[main] 3rd child spawned") + except Exception as error: + # Because exceptions just *work*! + print(f"[main] Exception from child caught! {repr(error)}") + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=None) diff --git a/tests/sleep.py b/tests/sleep.py index 671834a..4184fbf 100644 --- a/tests/sleep.py +++ b/tests/sleep.py @@ -1,56 +1,10 @@ import giambio -class Debugger(giambio.debug.BaseDebugger): - """ - A simple debugger for this test - """ - - def on_start(self): - print("## Started running") - - def on_exit(self): - print("## Finished running") - - def on_task_schedule(self, task, delay: int): - print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") - - def on_task_spawn(self, task): - print(f">> A task named '{task.name}' was spawned") - - def on_task_exit(self, task): - print(f"<< Task '{task.name}' exited") - - def before_task_step(self, task): - print(f"-> About to run a step for '{task.name}'") - - def after_task_step(self, task): - print(f"<- Ran a step for '{task.name}'") - - def before_sleep(self, task, seconds): - print(f"# About to put '{task.name}' to sleep for {seconds:.2f} seconds") - - def after_sleep(self, task, seconds): - print(f"# Task '{task.name}' slept for {seconds:.2f} seconds") - - def before_io(self, timeout): - print(f"!! About to check for I/O for up to {timeout:.2f} seconds") - - def after_io(self, timeout): - print(f"!! Done I/O check (waited for {timeout:.2f} seconds)") - - def before_cancel(self, task): - print(f"// About to cancel '{task.name}'") - - def after_cancel(self, task): - print(f"// Cancelled '{task.name}'") - - async def child(): print("[child] Child spawned!! Sleeping for 2 seconds") await giambio.sleep(2) print("[child] Had a nice nap!") - # raise TypeError("rip") # Uncomment this line and watch the exception magically propagate! async def child1(): @@ -61,14 +15,10 @@ async def child1(): async def main(): start = giambio.clock() - try: - async with giambio.create_pool() as pool: - pool.spawn(child) - pool.spawn(child1) - print("[main] Children spawned, awaiting completion") - except Exception as error: - # Because exceptions just *work*! - print(f"[main] Exception from child caught! {repr(error)}") + async with giambio.create_pool() as pool: + pool.spawn(child) + pool.spawn(child1) + print("[main] Children spawned, awaiting completion") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")