diff --git a/README.md b/README.md index b126a04..db44bfc 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,13 @@ Giambio means to take the best of all of its predecessors, while being: - Dependency-free: No fancy C modules, no external libraries, just pure idiomatic Python code - Community-based: I frankly wouldn't have bothered making this if curio was open to community additions +Another problem I would like to address and that I've heard some developers rant about is the lack of control +that the `run()` paradigm causes: you can read a way better and more detailed explanation [here](https://gist.github.com/Justasic/b57bfd05dd8e7a108bc433c8c9a66e59). +Giambio fixes this problem by exposing all of its internal machinery to the public and also allowing +to not start listening for events automatically by doing `AsyncScheduler(...).start(..., loop=False)`, in which case +the responsibility of handling everything (including loop ticks) is transferred to the end user allowing for a much +more granular control of the loop according to one's needs. + ## Current limitations giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely: diff --git a/giambio/__init__.py b/giambio/__init__.py index 354b04c..4f98f2d 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -20,7 +20,7 @@ __author__ = "Nocturn9x" __version__ = (0, 0, 1) -from . import exceptions, socket, context, core +from . import exceptions, socket, context, core, task, io from .traps import sleep, current_task from .sync import Event from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout diff --git a/giambio/context.py b/giambio/context.py index cd94e33..bd66fc5 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -34,15 +34,13 @@ class TaskManager: Object constructor """ - # The event loop associated with this pool - self.loop: giambio.core.AsyncScheduler = giambio.get_event_loop() # All the tasks that belong to this pool - self.tasks: List[giambio.objects.Task] = [] + self.tasks: List[giambio.task.Task] = [] # Whether we have been cancelled or not self.cancelled: bool = False # The clock time of when we started running, used for # timeouts expiration - self.started: float = self.loop.clock() + self.started: float = giambio.clock() # The pool's timeout (in seconds) if timeout: self.timeout: float = self.started + timeout @@ -57,7 +55,7 @@ class TaskManager: Spawns a child task """ - assert self._proper_init + assert self._proper_init, "Cannot use improperly initialized pool" return await giambio.traps.create_task(func, *args) async def __aenter__(self): diff --git a/giambio/core.py b/giambio/core.py index 94f6b48..a56e9e4 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -18,15 +18,11 @@ limitations under the License. # Import libraries and internal resources import types -import socket -from itertools import chain from giambio.task import Task -from giambio.sync import Event from timeit import default_timer from giambio.context import TaskManager -from typing import List, Optional, Set, Any +from typing import List, Optional, Any from giambio.util.debug import BaseDebugger -from giambio.traps import want_read, want_write from giambio.internal import TimeQueue, DeadlinesQueue from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from giambio.exceptions import ( @@ -105,7 +101,7 @@ class AsyncScheduler: # Tasks that are ready to run self.run_ready: List[Task] = [] # Selector object to perform I/O multiplexing - self.selector: DefaultSelector = DefaultSelector() + self.selector = selector or DefaultSelector() # This will always point to the currently running coroutine (Task object) self.current_task: Optional[Task] = None # Monotonic clock to keep track of elapsed time reliably @@ -134,11 +130,25 @@ class AsyncScheduler: Returns repr(self) """ - fields = {"debugger", "tasks", "run_ready", "selector", "current_task", - "clock", "paused", "has_ran", "current_pool", "io_skip", - "deadlines", "_data", "io_skip_limit", "io_max_timeout" - } - data = ", ".join(name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields))) + fields = { + "debugger", + "tasks", + "run_ready", + "selector", + "current_task", + "clock", + "paused", + "has_ran", + "current_pool", + "io_skip", + "deadlines", + "_data", + "io_skip_limit", + "io_max_timeout", + } + data = ", ".join( + name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) + ) return f"{type(self).__name__}({data})" def done(self) -> bool: @@ -146,18 +156,16 @@ class AsyncScheduler: Returns True if there is no work to do """ - if any([self.paused, self.run_ready, self.selector.get_map()]): - return False - return True + return not any([self.paused, self.run_ready, self.selector.get_map()]) def shutdown(self): """ Shuts down the event loop """ + for task in self.tasks: + self.io_release_task(task) self.selector.close() - self.tasks = [] - self.current_task = self.current_pool = None # TODO: Anything else? def run(self): @@ -198,11 +206,7 @@ class AsyncScheduler: if self.paused: # Next we try to (re)schedule the asleep tasks self.awake_sleeping() - if ( - self.current_pool - and self.current_pool.timeout - and not self.current_pool.timed_out - ): + if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: # Stores deadlines for tasks (deadlines are pool-specific). # The deadlines queue will internally make sure not to store # a deadline for the same pool twice. This makes the timeouts @@ -232,14 +236,21 @@ class AsyncScheduler: self.current_task.exc = err self.join(self.current_task) - def create_task(self, coro, *args) -> Task: + def create_task(self, corofunc: types.FunctionType, *args, **kwargs) -> Task: """ - Creates a task + Creates a task from a coroutine function and schedules it + to run. Any extra keyword or positional argument are then + passed to the function + + :param corofunc: The coroutine function (not a coroutine!) to + spawn + :type corofunc: function """ - task = Task(coro.__name__ or str(coro), coro(*args), self.current_pool) + task = Task(corofunc.__name__ or str(corofunc), corofunc(*args, **kwargs), self.current_pool) task.next_deadline = self.current_pool.timeout or 0.0 task.joiners = {self.current_task} + self._data = task self.tasks.append(task) self.run_ready.append(task) self.debugger.on_task_spawn(task) @@ -251,7 +262,7 @@ class AsyncScheduler: """ Runs a single step for the current task. A step ends when the task awaits any of - giambio's primitives or async methods. + our primitives or async methods. Note that this method does NOT catch any exception arising from tasks, nor does it @@ -278,7 +289,7 @@ class AsyncScheduler: # somewhere) method, *args = self.current_task.run(data) if data is self._data: - self._data = None + self._data = None # Some debugging and internal chatter here self.current_task.status = "run" self.current_task.steps += 1 @@ -290,8 +301,7 @@ class AsyncScheduler: # compared to us. If you get this exception and you're 100% sure you're # not mixing async primitives from other libraries, then it's a bug! raise InternalError( - "Uh oh! Something very bad just happened, did" - " you try to mix primitives from other async libraries?" + "Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?" ) from None # Sneaky method call, thanks to David Beazley for this ;) getattr(self, method)(*args) @@ -320,12 +330,19 @@ class AsyncScheduler: if self.selector.get_map() and sock in self.selector.get_map(): self.selector.unregister(sock) - def suspend(self): + def suspend(self, task: Task): """ - Suspends execution of the current task + Suspends execution of the given task. This is basically + a do-nothing method, since it will not reschedule the task + before returning. The task will stay suspended as long as + something else outside the loop calls a trap to reschedule it. + + This method will unregister any I/O as well to ensure the task + isn't rescheduled in further calls to select() """ - ... # TODO: Unschedule I/O? + if task.last_io: + self.io_release_task(task) def reschedule_running(self): """ @@ -334,6 +351,8 @@ class AsyncScheduler: if self.current_task: self.run_ready.append(self.current_task) + else: + raise GiambioError("giambio is not running") def do_cancel(self, task: Task): """ @@ -357,7 +376,6 @@ class AsyncScheduler: self._data = self.current_task self.reschedule_running() - def get_current_pool(self): """ 'Returns' the current pool to an async caller @@ -366,7 +384,6 @@ class AsyncScheduler: self._data = self.current_pool self.reschedule_running() - def get_current_loop(self): """ 'Returns' self to an async caller @@ -394,6 +411,7 @@ class AsyncScheduler: """ self.run_ready.extend(tasks) + self.reschedule_running() def awake_sleeping(self): """ @@ -470,7 +488,9 @@ class AsyncScheduler: def start(self, func: types.FunctionType, *args, loop: bool = True): """ - Starts the event loop from a sync context + Starts the event loop from a sync context. If the loop parameter + is false, the event loop will not start listening for events + automatically and the dispatching is on the users' shoulders """ entry = Task(func.__name__ or str(func), func(*args), None) @@ -502,7 +522,7 @@ class AsyncScheduler: else: # If we're at the main task, we're sure everything else exited return True - def get_all_tasks(self) -> chain: + def get_all_tasks(self) -> List[Task]: """ Returns a list of all the tasks the loop is currently keeping track of: this includes both running and paused tasks. @@ -536,9 +556,7 @@ class AsyncScheduler: if ensure_done: self.cancel_all() elif not self.done(): - raise GiambioError( - "event loop not terminated, call this method with ensure_done=False to forcefully exit" - ) + raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() def reschedule_joiners(self, task: Task): @@ -639,9 +657,7 @@ class AsyncScheduler: # or dangling resource open after being cancelled, so maybe we need # a different approach altogether if task.status == "io": - for k in filter( - lambda o: o.data == task, dict(self.selector.get_map()).values() - ): + for k in filter(lambda o: o.data == task, dict(self.selector.get_map()).values()): self.selector.unregister(k.fileobj) elif task.status == "sleep": self.paused.discard(task) @@ -711,26 +727,4 @@ class AsyncScheduler: self.selector.register(sock, evt, self.current_task) except KeyError: # The socket is already registered doing something else - raise ResourceBusy( - "The given socket is being read/written by another task" - ) from None - - # noinspection PyMethodMayBeStatic - async def connect_sock(self, sock: socket.socket, address_tuple: tuple): - """ - Connects a socket asynchronously to a given endpoint - - :param sock: The socket that must to be connected - :type sock: socket.socket - :param address_tuple: A tuple in the same form as the one - passed to socket.socket.connect with an address as a string - and a port as an integer - :type address_tuple: tuple - """ - - await want_write(sock) - try: - return sock.connect(address_tuple) - except BlockingIOError: - await want_write(sock) - return sock.connect(address_tuple) + raise ResourceBusy("The given socket is being read/written by another task") from None diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 2d29fbe..f03207d 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -95,7 +95,9 @@ class ErrorStack(GiambioError): tracebacks = "" for i, err in enumerate(self.errors): if i not in (1, len(self.errors)): - tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n" + tracebacks += ( + f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n" + ) else: tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}" return f"Multiple errors occurred:\n{tracebacks}" diff --git a/giambio/internal.py b/giambio/internal.py index 4a9f82d..5c06562 100644 --- a/giambio/internal.py +++ b/giambio/internal.py @@ -15,8 +15,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ + +import giambio +from typing import List, Tuple from giambio.task import Task -from heapq import heappush, heappop +from heapq import heappush, heappop, heapify class TimeQueue: diff --git a/giambio/io.py b/giambio/io.py index 07962e6..9c0d898 100644 --- a/giambio/io.py +++ b/giambio/io.py @@ -169,7 +169,7 @@ class AsyncSocket: Wrapper socket method """ - raise RuntimeError('Use with_timeout() to set a timeout') + raise RuntimeError("Use with_timeout() to set a timeout") def gettimeout(self): """ @@ -205,15 +205,15 @@ class AsyncSocket: try: result = self.sock.connect(address) - if getattr(self, 'do_handshake_on_connect', False): + if getattr(self, "do_handshake_on_connect", False): await self.do_handshake() return result except WantWrite: await want_write(self.sock) err = self.sock.getsockopt(SOL_SOCKET, SO_ERROR) if err != 0: - raise OSError(err, f'Connect call failed {address}') - if getattr(self, 'do_handshake_on_connect', False): + raise OSError(err, f"Connect call failed {address}") + if getattr(self, "do_handshake_on_connect", False): await self.do_handshake() async def recvfrom(self, buffersize, flags=0): diff --git a/giambio/sync.py b/giambio/sync.py index c752386..84e2816 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -16,6 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. """ from giambio.traps import event_wait, event_set +from giambio.exceptions import GiambioError class Event: @@ -38,7 +39,7 @@ class Event: """ if self.set: - raise giambio.exceptions.GiambioError("The event has already been set") + raise GiambioError("The event has already been set") await event_set(self) async def wait(self): diff --git a/giambio/task.py b/giambio/task.py index f2c7e35..e15c5bf 100644 --- a/giambio/task.py +++ b/giambio/task.py @@ -18,7 +18,7 @@ limitations under the License. import giambio from dataclasses import dataclass, field -from typing import Union, Coroutine, List, Tuple, Set +from typing import Union, Coroutine, Set @dataclass @@ -104,8 +104,9 @@ class Task: are propagated as well """ - self.joiners.add(await giambio.traps.current_task()) - print(self.joiners) + task = await giambio.traps.current_task() + if task: + self.joiners.add(task) res = await giambio.traps.join(self) if self.exc: raise self.exc @@ -142,5 +143,4 @@ class Task: self.coroutine.close() except RuntimeError: pass # TODO: This is kinda bad - assert not self.last_io - + assert not self.last_io, f"task {self.name} was destroyed, but has pending I/O" diff --git a/giambio/traps.py b/giambio/traps.py index 7a35fec..c73d574 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -25,7 +25,7 @@ import types import inspect from giambio.task import Task from types import FunctionType -from typing import List, Union, Iterable +from typing import Union, Iterable from giambio.exceptions import GiambioError @@ -58,7 +58,7 @@ async def create_task(coro: FunctionType, *args): elif inspect.iscoroutinefunction(coro): return await create_trap("create_task", coro, *args) else: - raise TypeError("coro must be a coroutine or coroutine function") + raise TypeError("coro must be a coroutine function") async def sleep(seconds: Union[int, float]): @@ -122,7 +122,7 @@ async def join(task): Awaits a given task for completion :param task: The task to join - :type task: class: Task + :type task: :class: Task """ return await create_trap("join", task) @@ -187,7 +187,6 @@ async def event_set(event): """ event.set = True - await reschedule_running() await schedule_tasks(event.waiters) @@ -197,11 +196,3 @@ async def schedule_tasks(tasks: Iterable[Task]): """ await create_trap("schedule_tasks", tasks) - - -async def reschedule_running(): - """ - Reschedules the current task for execution - """ - - await create_trap("reschedule_running") diff --git a/tests/cancel.py b/tests/cancel.py index 6981960..1527002 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -11,15 +11,11 @@ async def child(name: int): async def main(): start = giambio.clock() async with giambio.create_pool() as pool: - await pool.spawn( - child, 1 - ) # If you comment this line, the pool will exit immediately! + await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! task = await pool.spawn(child, 2) await task.cancel() print("[main] Children spawned, awaiting completion") - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": diff --git a/tests/debugger.py b/tests/debugger.py index d2619f0..8cb807f 100644 --- a/tests/debugger.py +++ b/tests/debugger.py @@ -13,9 +13,7 @@ class Debugger(giambio.debug.BaseDebugger): print("## Finished running") def on_task_schedule(self, task, delay: int): - print( - f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds" - ) + print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds") def on_task_spawn(self, task): print(f">> A task named '{task.name}' was spawned") diff --git a/tests/events.py b/tests/events.py index c8175c7..6f5938c 100644 --- a/tests/events.py +++ b/tests/events.py @@ -15,9 +15,7 @@ async def child(ev: giambio.Event, pause: int): await giambio.sleep(pause) end_sleep = giambio.clock() - start_sleep end_total = giambio.clock() - start_total - print( - f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!" - ) + print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!") async def parent(pause: int = 1): diff --git a/tests/exceptions.py b/tests/exceptions.py index 66b8c7b..82ce4b0 100644 --- a/tests/exceptions.py +++ b/tests/exceptions.py @@ -26,9 +26,7 @@ async def main(): except Exception as error: # Because exceptions just *work*! print(f"[main] Exception from child caught! {repr(error)}") - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": diff --git a/tests/nested_exception.py b/tests/nested_exception.py index 5ae10c8..005dbc1 100644 --- a/tests/nested_exception.py +++ b/tests/nested_exception.py @@ -43,9 +43,7 @@ async def main(): except Exception as error: # Because exceptions just *work*! print(f"[main] Exception from child caught! {repr(error)}") - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": diff --git a/tests/nested_pool.py b/tests/nested_pool.py index e8ef641..023f418 100644 --- a/tests/nested_pool.py +++ b/tests/nested_pool.py @@ -19,9 +19,7 @@ async def main(): print("[main] Children spawned, awaiting completion") # This will *only* execute when everything inside the async with block # has ran, including any other pool - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": diff --git a/tests/server.py b/tests/server.py index 998ec40..a8f7235 100644 --- a/tests/server.py +++ b/tests/server.py @@ -29,9 +29,7 @@ async def handler(sock: AsyncSocket, client_address: tuple): """ Handles a single client connection - :param sock: The giambio.socket.AsyncSocket object connected - to the client - :type sock: :class: giambio.socket.AsyncSocket + :param sock: The AsyncSocket object connected to the client :param client_address: The client's address represented as a tuple (address, port) where address is a string and port is an integer :type client_address: tuple @@ -39,9 +37,7 @@ async def handler(sock: AsyncSocket, client_address: tuple): address = f"{client_address[0]}:{client_address[1]}" async with sock: # Closes the socket automatically - await sock.send_all( - b"Welcome to the server pal, feel free to send me something!\n" - ) + await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n") while True: await sock.send_all(b"-> ") data = await sock.receive(1024) @@ -49,9 +45,7 @@ async def handler(sock: AsyncSocket, client_address: tuple): break elif data == b"exit\n": await sock.send_all(b"I'm dead dude\n") - raise TypeError( - "Oh, no, I'm gonna die!" - ) # This kills the entire application! + raise TypeError("Oh, no, I'm gonna die!") # This kills the entire application! logging.info(f"Got: {data!r} from {address}") await sock.send_all(b"Got: " + data) logging.info(f"Echoed back {data!r} to {address}") diff --git a/tests/sleep.py b/tests/sleep.py index 67ef90d..834d905 100644 --- a/tests/sleep.py +++ b/tests/sleep.py @@ -19,9 +19,7 @@ async def main(): await pool.spawn(child) await pool.spawn(child1) print("[main] Children spawned, awaiting completion") - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": diff --git a/tests/timeout.py b/tests/timeout.py index 01c89ca..ac841f6 100644 --- a/tests/timeout.py +++ b/tests/timeout.py @@ -16,9 +16,7 @@ async def main(): await child(20) # TODO: Broken except giambio.exceptions.TooSlowError: print("[main] One or more children have timed out!") - print( - f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds" - ) + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__":