From 981a598ae795379a472f3d10f3c8897e01e7d8cc Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Mon, 16 Nov 2020 08:07:19 +0100 Subject: [PATCH] Various improvements to async pools --- giambio/__init__.py | 14 +- giambio/_core.py | 335 ---------------------- giambio/_traps.py | 118 -------- giambio/{_managers.py => context.py} | 31 +-- giambio/core.py | 401 +++++++++++++++++++++++++++ giambio/{_layers.py => objects.py} | 80 ++++-- giambio/{_run.py => run.py} | 47 +++- giambio/socket.py | 41 ++- giambio/traps.py | 134 +++++++++ tests/count.py | 29 +- tests/server.py | 14 +- 11 files changed, 685 insertions(+), 559 deletions(-) delete mode 100644 giambio/_core.py delete mode 100644 giambio/_traps.py rename giambio/{_managers.py => context.py} (67%) create mode 100644 giambio/core.py rename giambio/{_layers.py => objects.py} (64%) rename giambio/{_run.py => run.py} (68%) create mode 100644 giambio/traps.py diff --git a/giambio/__init__.py b/giambio/__init__.py index 921c37c..45f118c 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -16,10 +16,13 @@ limitations under the License. __author__ = "Nocturn9x aka Isgiambyy" __version__ = (1, 0, 0) -from ._run import run, clock, wrap_socket, create_pool + + from .exceptions import GiambioError, AlreadyJoinedError, CancelledError -from ._traps import sleep -from ._layers import Event +from .traps import sleep, current_task +from .objects import Event +from .run import run, clock, wrap_socket, create_pool, get_event_loop, new_event_loop + __all__ = [ "GiambioError", @@ -30,5 +33,8 @@ __all__ = [ "run", "clock", "wrap_socket", - "create_pool" + "create_pool", + "get_event_loop", + "current_task", + "new_event_loop" ] diff --git a/giambio/_core.py b/giambio/_core.py deleted file mode 100644 index 32c0c22..0000000 --- a/giambio/_core.py +++ /dev/null @@ -1,335 +0,0 @@ -""" -Copyright (C) 2020 nocturn9x - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -# Import libraries and internal resources -import types -from collections import defaultdict -from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE -import socket -from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError -from timeit import default_timer -from time import sleep as wait -from .socket import AsyncSocket, WantWrite, WantRead -from ._layers import Task, TimeQueue -from socket import SOL_SOCKET, SO_ERROR -from ._traps import want_read, want_write -import traceback, sys - - -class AsyncScheduler: - """ - An asynchronous scheduler toy implementation. Tries to mimic the threaded - model in its simplicity, without using actual threads, but rather alternating - across coroutines execution to let more than one thing at a time to proceed - with its calculations. An attempt to fix the threaded model underlying pitfalls - and weaknesses has been made, without making the API unnecessarily complicated. - A few examples are tasks cancellation and exception propagation. - Can perform (unreliably) socket I/O asynchronously. - """ - - def __init__(self): - """Object constructor""" - - self.tasks = [] # Tasks that are ready to run - self.selector = DefaultSelector() # Selector object to perform I/O multiplexing - self.current_task = None # This will always point to the currently running coroutine (Task object) - self.joined = ( - {} - ) # Maps child tasks that need to be joined their respective parent task - self.clock = ( - default_timer # Monotonic clock to keep track of elapsed time reliably - ) - self.some_cancel = False - self.paused = TimeQueue(self.clock) # Tasks that are asleep - self.events = set() # All Event objects - self.event_waiting = defaultdict(list) # Coroutines waiting on event objects - self.sequence = 0 - - def _run(self): - """ - Starts the loop and 'listens' for events until there are either ready or asleep tasks, - then exit. This behavior kinda reflects a kernel, as coroutines can request - the loop's functionality only trough some fixed entry points, which in turn yield and - give execution control to the loop itself. - """ - - while True: - try: - if not self.selector.get_map() and not any( - [self.paused, self.tasks, self.event_waiting] - ): # If there is nothing to do, just exit - break - elif not self.tasks: - if self.paused: - # If there are no actively running tasks, we try to schedule the asleep ones - self._check_sleeping() - if self.selector.get_map(): - self._check_io() # The next step is checking for I/O - if self.event_waiting: - # Try to awake event-waiting tasks - self._check_events() - while self.tasks: # While there are tasks to run - self.current_task = self.tasks.pop(0) - if self.some_cancel: - self._check_cancel() - # Sets the currently running task - method, *args = self.current_task.run() # Run a single step with the calculation - self.current_task.status = "run" - getattr(self, f"_{method}")(*args) - # Sneaky method call, thanks to David Beazley for this ;) - except CancelledError: - self.current_task.cancelled = True - self._reschedule_parent() - except StopIteration as e: # Coroutine ends - self.current_task.result = e.args[0] if e.args else None - self.current_task.finished = True - self._reschedule_parent() - except RuntimeError: - continue - except BaseException as error: # Coroutine raised - print(error) - self.current_task.exc = error - self._reschedule_parent() - self._join(self.current_task) - raise - - def _check_cancel(self): - """ - Checks for task cancellation - """ - - if self.current_task.status == "cancel": # Deferred cancellation - self.current_task.cancelled = True - self.current_task.throw(CancelledError(self.current_task)) - - def _check_events(self): - """ - Checks for ready or expired events and triggers them - """ - - for event, tasks in self.event_waiting.copy().items(): - if event._set: - event.event_caught = True - self.tasks.extend(tasks + [event.notifier]) - self.event_waiting.pop(event) - - def _check_sleeping(self): - """ - Checks and reschedules sleeping tasks - """ - - wait(max(0.0, self.paused[0][0] - self.clock())) - # Sleep until the closest deadline in order not to waste CPU cycles - while self.paused[0][0] < self.clock(): - # Reschedules tasks when their deadline has elapsed - self.tasks.append(self.paused.get()) - if not self.paused: - break - - def _check_io(self): - """ - Checks and schedules task to perform I/O - """ - - timeout = 0.0 if self.tasks else None - # If there are no tasks ready wait indefinitely - io_ready = self.selector.select(timeout) - # Get sockets that are ready and schedule their tasks - for key, _ in io_ready: - self.tasks.append(key.data) # Resource ready? Schedule its task - - def start(self, func: types.FunctionType, *args): - """ - Starts the event loop from a sync context - """ - - entry = Task(func(*args)) - self.tasks.append(entry) - self._join(entry) # TODO -> Inspect this line, does it actually do anything useful? - self._run() - return entry - - def _reschedule_parent(self): - """ - Reschedules the parent task of the - currently running task, if any - """ - - parent = self.joined.pop(self.current_task, None) - if parent: - self.tasks.append(parent) - return parent - - # TODO: More generic I/O rather than just sockets - def _want_read(self, sock: socket.socket): - """ - Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing - """ - - self.current_task.status = "I/O" - if self.current_task._last_io: - if self.current_task._last_io == ("READ", sock): - return # Socket is already scheduled! - else: - self.selector.unregister(sock) - self.current_task._last_io = "READ", sock - try: - self.selector.register(sock, EVENT_READ, self.current_task) - except KeyError: # The socket is already registered doing something else - raise ResourceBusy("The given resource is busy!") from None - - def _want_write(self, sock: socket.socket): - """ - Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing - """ - - self.current_task.status = "I/O" - if self.current_task._last_io: - if self.current_task._last_io == ("WRITE", sock): - return # Socket is already scheduled! - else: - self.selector.unregister(sock) # modify() causes issues - self.current_task._last_io = "WRITE", sock - try: - self.selector.register(sock, EVENT_WRITE, self.current_task) - except KeyError: - raise ResourceBusy("The given resource is busy!") from None - - def _join(self, child: types.coroutine): - """ - Handler for the 'join' event, does some magic to tell the scheduler - to wait until the passed coroutine ends. The result of this call equals whatever the - coroutine returns or, if an exception gets raised, the exception will get propagated inside the - parent task - """ - - if child.cancelled or child.exc: # Task was cancelled or has errored - self._reschedule_parent() - elif child.finished: # Task finished running - self.tasks.append(self.current_task) # Task has already finished - else: - if child not in self.joined: - self.joined[child] = self.current_task - else: - raise AlreadyJoinedError( - "Joining the same task multiple times is not allowed!" - ) - - def _sleep(self, seconds: int or float): - """ - Puts the caller to sleep for a given amount of seconds - """ - - if seconds: - self.current_task.status = "sleep" - self.paused.put(self.current_task, seconds) - else: - self.tasks.append(self.current_task) - - def _event_set(self, event): - """ - Sets an event - """ - - event.notifier = self.current_task - event._set = True - self.events.add(event) - - def _event_wait(self, event): - """ - Waits for an event - """ - - if event in self.events: - event.waiting -= 1 - if event.waiting <= 0: - return self.events.remove(event) - else: - return - else: - self.event_waiting[event].append(self.current_task) - - def _cancel(self, task): - """ - Handler for the 'cancel' event, throws CancelledError inside a coroutine - in order to stop it from executing. The loop continues to execute as tasks - are independent - """ - - if not self.some_cancel: - self.some_cancel = True - task.status = "cancel" # Cancellation is deferred - - def wrap_socket(self, sock): - """ - Wraps a standard socket into an AsyncSocket object - """ - - return AsyncSocket(sock, self) - - async def _read_sock(self, sock: socket.socket, buffer: int): - """ - Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes - from the socket - """ - - await want_read(sock) - try: - return sock.recv(buffer) - except WantRead: - await want_write(sock) - return sock.recv(buffer) - - async def _accept_sock(self, sock: socket.socket): - """ - Accepts a socket connection asynchronously, waiting until the resource is available and returning the - result of the accept() call - """ - - await want_read(sock) - return sock.accept() - - async def _sock_sendall(self, sock: socket.socket, data: bytes): - """ - Sends all the passed data, as bytes, trough the socket asynchronously - """ - - while data: - await want_write(sock) - sent_no = sock.send(data) - data = data[sent_no:] - - async def _close_sock(self, sock: socket.socket): - """ - Closes the socket asynchronously - """ - - await want_write(sock) - self.selector.unregister(sock) - return sock.close() - - async def _connect_sock(self, sock: socket.socket, addr: tuple): - """ - Connects a socket asynchronously - """ - - try: # "Borrowed" from curio - return sock.connect(addr) - except WantWrite: - await want_write(sock) - err = sock.getsockopt(SOL_SOCKET, SO_ERROR) - if err != 0: - raise OSError(err, f"Connect call failed: {addr}") diff --git a/giambio/_traps.py b/giambio/_traps.py deleted file mode 100644 index bda1c37..0000000 --- a/giambio/_traps.py +++ /dev/null @@ -1,118 +0,0 @@ -""" -Copyright (C) 2020 nocturn9x - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -# Helper methods to interact with the event loop -# These coroutines are the one and only way to interact -# with the event loop from the user's perspective, and -# the entire library is based on these traps - -import types -import socket - - -@types.coroutine -def sleep(seconds: int): - """Pause the execution of a coroutine for the passed amount of seconds, - without blocking the entire event loop, which keeps watching for other events - - This function is also useful as a sort of checkpoint, because it returns the execution - control to the scheduler, which can then switch to another task. If a coroutine does not have - enough calls to async methods (or 'checkpoints'), e.g one that needs the 'await' keyword before it, this might - affect performance as it would prevent the scheduler from switching tasks properly. If you feel - like this happens in your code, try adding a call to giambio.sleep(0); this will act as a checkpoint without - actually pausing the execution of your coroutine - - :param seconds: The amount of seconds to sleep for - :type seconds: int - """ - - assert seconds >= 0, "The time delay can't be negative" - yield "sleep", seconds - - -@types.coroutine -def join(task): - """'Tells' the scheduler that the desired task MUST be awaited for completion - - :param task: The task to join - :type task: class: Task - """ - - res = yield "join", task - return task.result - - -@types.coroutine -def cancel(task): - """ - 'Tells' the scheduler that the passed task must be cancelled - - The concept of cancellation here is tricky, because there is no real way to 'stop' a - running task if not by raising an exception inside it and just ignore whatever the task - returns (and also hoping that the task won't cause collateral damage when exiting abruptly). - It is highly recommended that when you write a coroutine you take into account that it might - be cancelled at any time. Please note, though, that ignoring a giambio.exceptions.CancelledError - exception *will* break your code, so if you really wanna do that be sure to re-raise - it when done! - """ - - yield "cancel", task - assert task.cancelled, f"Coroutine ignored CancelledError" - - -@types.coroutine -def want_read(sock: socket.socket): - """ - 'Tells' the event loop that there is some coroutine that wants to read from the given socket - - :param sock: The socket to perform the operation on - :type sock: class: socket.socket - """ - - yield "want_read", sock - - -@types.coroutine -def want_write(sock: socket.socket): - """ - 'Tells' the event loop that there is some coroutine that wants to write on the given socket - - :param sock: The socket to perform the operation on - :type sock: class: socket.socket - """ - - yield "want_write", sock - - -@types.coroutine -def event_set(event): - """Communicates to the loop that the given event object - must be set. This is important as the loop constantly - checks for active events to deliver them - """ - - yield "event_set", event - - -@types.coroutine -def event_wait(event): - """ - Notifies the event loop that the current task has to wait - for the event to trigger - """ - - msg = yield "event_wait", event - return msg diff --git a/giambio/_managers.py b/giambio/context.py similarity index 67% rename from giambio/_managers.py rename to giambio/context.py index bbe6fcb..8ff98aa 100644 --- a/giambio/_managers.py +++ b/giambio/context.py @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -from ._core import AsyncScheduler -from ._layers import Task + import types +from .core import AsyncScheduler +from .objects import Task class TaskManager: @@ -30,15 +31,17 @@ class TaskManager: """ self.loop = loop + self.tasks = [] def spawn(self, func: types.FunctionType, *args): """ Spawns a child task """ - task = Task(func(*args)) + task = Task(func(*args), func.__name__ or str(func)) + task.parent = self.loop.current_task self.loop.tasks.append(task) - return task + self.tasks.append(task) def spawn_after(self, func: types.FunctionType, n: int, *args): """ @@ -46,23 +49,19 @@ class TaskManager: """ assert n >= 0, "The time delay can't be negative" - task = Task(func(*args)) + task = Task(func(*args), func.__name__ or str(func)) + task.parent = self.loop.current_task self.loop.paused.put(task, n) - return task + self.tasks.append(task) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc, tb): - for task in self.loop.tasks: + for task in self.tasks: try: await task.join() - except BaseException as e: - for running_task in self.loop.tasks: - await running_task.cancel() - for _, __, asleep_task in self.loop.paused: - await asleep_task.cancel() - for waiting_tasks in self.loop.event_waiting.values(): - for waiting_task in waiting_tasks: - await waiting_task.cancel() - raise e + except BaseException as task_error: + for dead in self.tasks: + await dead.cancel() + raise task.exc diff --git a/giambio/core.py b/giambio/core.py new file mode 100644 index 0000000..dfb5790 --- /dev/null +++ b/giambio/core.py @@ -0,0 +1,401 @@ +""" +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Import libraries and internal resources +import types +import socket +from time import sleep as wait +from timeit import default_timer +from .objects import Task, TimeQueue +from socket import SOL_SOCKET, SO_ERROR +from .traps import want_read, want_write +from collections import defaultdict, deque +from .socket import AsyncSocket, WantWrite, WantRead +from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE +from .exceptions import ( + AlreadyJoinedError, + CancelledError, + ResourceBusy, + GiambioError + ) + + +# The main runtime environment for giambio + +class AsyncScheduler: + """ + An asynchronous scheduler implementation. Tries to mimic the threaded + model in its simplicity, without using actual threads, but rather alternating + across coroutines execution to let more than one thing at a time to proceed + with its calculations. An attempt to fix the threaded model has been made + without making the API unnecessarily complicated. + A few examples are tasks cancellation and exception propagation. + """ + + def __init__(self): + """ + Object constructor + """ + + # Tasks that are ready to run + self.tasks = deque() + # Selector object to perform I/O multiplexing + self.selector = DefaultSelector() + # This will always point to the currently running coroutine (Task object) + self.current_task = None + # Monotonic clock to keep track of elapsed time reliably + self.clock = default_timer + # Tasks that are asleep + self.paused = TimeQueue(self.clock) + # All active Event objects + self.events = set() + # Coroutines waiting on event objects + self.event_waiting = defaultdict(list) + # Data to send back to a trap + self.to_send = None + # Have we ever ran? + self.has_ran = False + + def done(self): + """ + Returns True if there is work to do + """ + + if self.selector.get_map() or any([self.paused, self.tasks, self.event_waiting]): + return False + return True + + def shutdown(self): + """ + Shuts down the event loop + """ + + self.selector.close() + + def run(self): + """ + Starts the loop and 'listens' for events until there is work to do, + then exits. This behavior kinda reflects a kernel, as coroutines can + request the loop's functionality only trough some fixed entry points, + which in turn yield and give execution control to the loop itself. + """ + + while True: + try: + if self.done(): + self.shutdown() + break + elif not self.tasks: + if self.paused: + # If there are no actively running tasks + # we try to schedule the asleep ones + self.awake_sleeping() + if self.selector.get_map(): + # The next step is checking for I/O + self.check_io() + if self.event_waiting: + # Try to awake event-waiting tasks + self.trigger_events() + # While there are tasks to run + while self.tasks: + # Sets the currently running task + self.current_task = self.tasks.popleft() + if self.current_task.cancel_pending: + self.do_cancel() + if self.to_send and self.current_task.status != "init": + data = self.to_send + else: + data = None + # Run a single step with the calculation + method, *args = self.current_task.run(data) + self.current_task.status = "run" + self.current_task.steps += 1 + # Data has been sent, reset it to None + if self.to_send and self.current_task != "init": + self.to_send = None + # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, method)(*args) + except CancelledError: + self.current_task.status = "end" + self.current_task.cancelled = True + self.current_task.cancel_pending = False + self.join(self.current_task, self.current_task.parent) + except StopIteration as ret: + # Coroutine ends + self.current_task.status = "end" + self.current_task.result = ret.value + self.current_task.finished = True + except BaseException as err: + self.current_task.exc = err + self.current_task.status = "crashed" + self.join(self.current_task, self.current_task.parent) + + def do_cancel(self): + """ + Performs task cancellation by throwing CancelledError inside the current + task in order to stop it from executing. The loop continues to execute + as tasks are independent + """ + + self.current_task.throw(CancelledError) + + def get_running(self): + """ + Returns the current task + """ + + self.tasks.append(self.current_task) + self.to_send = self.current_task + + def trigger_events(self): + """ + Checks for ready or expired events and triggers them + """ + + for event, tasks in self.event_waiting.copy().items(): + if event.set: + event.event_caught = True + self.tasks.extend(tasks + [event.notifier]) + self.event_waiting.pop(event) + + def awake_sleeping(self): + """ + Checks for and reschedules sleeping tasks + """ + + wait(max(0.0, self.paused[0][0] - self.clock())) + # Sleep until the closest deadline in order not to waste CPU cycles + while self.paused[0][0] < self.clock(): + # Reschedules tasks when their deadline has elapsed + self.tasks.append(self.paused.get()) + if not self.paused: + break + + def check_io(self): + """ + Checks and schedules task to perform I/O + """ + + # If there are no tasks ready wait indefinitely + timeout = 0.0 if self.tasks else None + for key in dict(self.selector.get_map()).values(): + if key.data.finished: + self.selector.unregister(key.fileobj) + if self.selector.get_map(): + io_ready = self.selector.select(timeout) + # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.tasks.append(key.data) # Resource ready? Schedule its task + + def start(self, func: types.FunctionType, *args): + """ + Starts the event loop from a sync context + """ + + entry = Task(func(*args), func.__name__ or str(func)) + self.tasks.append(entry) + self.run() + self.has_ran = True + if entry.exc: + raise entry.exc from None + + def reschedule_parent(self): + """ + Reschedules the parent task of the + currently running task, if any + """ + + parent = self.current_task.parent + if parent: + self.tasks.append(parent) + return parent + + def reschedule_joinee(self): + """ + Reschedules the joinee task of the + currently running task, if any + """ + + self.tasks.extend(self.current_task.waiters) + + def join(self, child: types.coroutine, parent): + """ + Handler for the 'join' event, does some magic to tell the scheduler + to wait in the given parent until the current coroutine ends + """ + + child.joined = True + if parent: + print("p") + child.waiters.append(parent) + if child.cancelled or child.exc: + print("f") + # Task was cancelled or has errored + if child.parent: + self.tasks.append(child.parent) + self.tasks.extend(child.waiters) + elif child.finished: + print("finish") +# if parent: + # self.tasks.append(parent) + self.tasks.extend(child.waiters) + + def sleep(self, seconds: int or float): + """ + Puts the caller to sleep for a given amount of seconds + """ + + if seconds: + self.current_task.status = "sleep" + self.paused.put(self.current_task, seconds) + else: + self.tasks.append(self.current_task) + + # TODO: More generic I/O rather than just sockets + def want_read(self, sock: socket.socket): + """ + Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing + """ + + self.current_task.status = "I/O" + if self.current_task.last_io: + if self.current_task.last_io == ("READ", sock): + # Socket is already scheduled! + return + else: + self.selector.unregister(sock) + self.current_task.last_io = "READ", sock + try: + self.selector.register(sock, EVENT_READ, self.current_task) + except KeyError: + # The socket is already registered doing something else + raise ResourceBusy("The given resource is busy!") from None + + def want_write(self, sock: socket.socket): + """ + Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing + """ + + self.current_task.status = "I/O" + if self.current_task.last_io: + if self.current_task.last_io == ("WRITE", sock): + # Socket is already scheduled! + return + else: + # modify() causes issues + self.selector.unregister(sock) + self.current_task.last_io = "WRITE", sock + try: + self.selector.register(sock, EVENT_WRITE, self.current_task) + except KeyError: + raise ResourceBusy("The given resource is busy!") from None + + def event_set(self, event): + """ + Sets an event + """ + + event.notifier = self.current_task + event.set = True + self.events.add(event) + + def event_wait(self, event): + """ + Waits for an event + """ + + if event in self.events: + event.waiting -= 1 + if event.waiting <= 0: + return self.events.remove(event) + else: + return + else: + self.event_waiting[event].append(self.current_task) + + def cancel(self, task): + """ + Handler for the 'cancel' event, sets the task to be cancelled later + """ + + task.cancel_pending = True # Cancellation is deferred + + def wrap_socket(self, sock): + """ + Wraps a standard socket into an AsyncSocket object + """ + + return AsyncSocket(sock, self) + + async def read_sock(self, sock: socket.socket, buffer: int): + """ + Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes + from the socket + """ + + try: + return sock.recv(buffer) + except WantRead: + await want_read(sock) + return sock.recv(buffer) + + async def accept_sock(self, sock: socket.socket): + """ + Accepts a socket connection asynchronously, waiting until the resource is available and returning the + result of the accept() call + """ + + try: + return sock.accept() + except WantRead: + await want_read(sock) + return sock.accept() + + async def sock_sendall(self, sock: socket.socket, data: bytes): + """ + Sends all the passed data, as bytes, trough the socket asynchronously + """ + + while data: + try: + sent_no = sock.send(data) + except WantWrite: + await want_write(sock) + sent_no = sock.send(data) + data = data[sent_no:] + + async def close_sock(self, sock: socket.socket): + """ + Closes the socket asynchronously + """ + + await want_write(sock) + self.selector.unregister(sock) + sock.setblocking(False) + return sock.close() + + async def connect_sock(self, sock: socket.socket, addr: tuple): + """ + Connects a socket asynchronously + """ + + try: # "Borrowed" from curio + return sock.connect(addr) + except WantWrite: + await want_write(sock) + err = sock.getsockopt(SOL_SOCKET, SO_ERROR) + if err != 0: + raise OSError(err, f"Connect call failed: {addr}") diff --git a/giambio/_layers.py b/giambio/objects.py similarity index 64% rename from giambio/_layers.py rename to giambio/objects.py index 3dd52c5..28c9de9 100644 --- a/giambio/_layers.py +++ b/giambio/objects.py @@ -15,65 +15,79 @@ limitations under the License. """ import types -from ._traps import join, cancel, event_set, event_wait +from .traps import join, cancel, event_set, event_wait from heapq import heappop, heappush from .exceptions import GiambioError +from dataclasses import dataclass, field +@dataclass class Task: - """A simple wrapper around a coroutine object""" + """ + A simple wrapper around a coroutine object + """ - def __init__(self, coroutine: types.coroutine): - self.coroutine = coroutine - self.cancelled = False # True if the task gets cancelled - self.exc = None - self.result = None - self.finished = False - self.status = "init" # This is useful for cancellation - self._last_io = None + coroutine: types.CoroutineType + name: str + cancelled: bool = False # True if the task gets cancelled + exc: BaseException = None + result: object = None + finished: bool = False + status: str = "init" + steps: int = 0 + last_io: tuple = () + parent: object = None + joined: bool= False + cancel_pending: bool = False + waiters: list = field(default_factory=list) def run(self, what=None): - """Simple abstraction layer over the coroutines ``send`` method""" + """ + Simple abstraction layer over coroutines' ``send`` method + """ return self.coroutine.send(what) def throw(self, err: Exception): - """Simple abstraction layer over the coroutines ``throw`` method""" + """ + Simple abstraction layer over coroutines ``throw`` method + """ return self.coroutine.throw(err) async def join(self): - """Joins the task""" + """ + Joins the task + """ - if self.cancelled and not self.exc: - return None - if self.exc: - raise self.exc res = await join(self) if self.exc: raise self.exc return res async def cancel(self): - """Cancels the task""" + """ + Cancels the task + """ await cancel(self) - # await join(self) # TODO -> Join ourselves after cancellation? - def __repr__(self): - """Implements repr(self)""" - - return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished}, status={self.status})" + def __del__(self): + self.coroutine.close() class Event: - """A class designed similarly to threading.Event""" + """ + A class designed similarly to threading.Event + """ def __init__(self): - """Object constructor""" + """ + Object constructor + """ - self._set = False + self.set = False self.event_caught = False self.timeout = None self.waiting = 0 @@ -81,18 +95,20 @@ class Event: async def set(self): """ Sets the event, waking up all tasks that called - pause() on this event + pause() on us """ - if self._set: + if self.set: raise GiambioError("The event has already been set") await event_set(self) async def pause(self): - """Waits until the event is set and returns a value""" + """ + Waits until the event is set + """ self.waiting += 1 - return await event_wait(self) + await event_wait(self) class TimeQueue: @@ -102,6 +118,10 @@ class TimeQueue: """ def __init__(self, clock): + """ + Object constructor + """ + self.clock = clock self.sequence = 0 self.container = [] diff --git a/giambio/_run.py b/giambio/run.py similarity index 68% rename from giambio/_run.py rename to giambio/run.py index 2511a13..c8d9ced 100644 --- a/giambio/_run.py +++ b/giambio/run.py @@ -14,19 +14,49 @@ See the License for the specific language governing permissions and limitations under the License. """ +import socket import threading -from ._core import AsyncScheduler -from ._layers import Task -from ._managers import TaskManager +from .core import AsyncScheduler +from .exceptions import GiambioError +from .context import TaskManager from .socket import AsyncSocket from types import FunctionType, CoroutineType, GeneratorType -import socket thread_local = threading.local() -def run(func: FunctionType, *args) -> Task: +def get_event_loop(): + """ + Returns the event loop associated to the current + thread + """ + + try: + return thread_local.loop + except AttributeError: + raise GiambioError("no event loop set") from None + + +def new_event_loop(): + """ + Associates a new event loop to the current thread + and deactivates the old one. This should not be + called explicitly unless you know what you're doing + """ + + try: + loop = thread_local.loop + except AttributeError: + thread_local.loop = AsyncScheduler() + else: + if not loop.done(): + raise GiambioError("cannot set event loop while running") + else: + thread_local.loop = AsyncScheduler() + + +def run(func: FunctionType, *args): """ Starts the event loop from a synchronous entry point """ @@ -34,11 +64,8 @@ def run(func: FunctionType, *args) -> Task: if isinstance(func, (CoroutineType, GeneratorType)): raise RuntimeError("Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)") - try: - return thread_local.loop.start(func, *args) - except AttributeError: - thread_local.loop = AsyncScheduler() - return thread_local.loop.start(func, *args) + new_event_loop() + thread_local.loop.start(func, *args) def clock(): diff --git a/giambio/socket.py b/giambio/socket.py index 60d16c6..cbc2768 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -20,8 +20,9 @@ limitations under the License. import socket from .exceptions import ResourceClosed -from ._traps import sleep +from .traps import sleep +# Stolen from curio try: from ssl import SSLWantReadError, SSLWantWriteError WantRead = (BlockingIOError, InterruptedError, SSLWantReadError) @@ -32,7 +33,9 @@ except ImportError: class AsyncSocket(object): - """Abstraction layer for asynchronous sockets""" + """ + Abstraction layer for asynchronous sockets + """ def __init__(self, sock: socket.socket, loop): self.sock = sock @@ -41,46 +44,54 @@ class AsyncSocket(object): self._closed = False async def receive(self, max_size: int): - """Receives up to max_size from a socket asynchronously""" + """ + Receives up to max_size from a socket asynchronously + """ if self._closed: raise ResourceClosed("I/O operation on closed socket") - self.loop.current_task.status = "I/O" - return await self.loop._read_sock(self.sock, max_size) + return await self.loop.read_sock(self.sock, max_size) async def accept(self): - """Accepts the socket, completing the 3-step TCP handshake asynchronously""" + """ + Accepts the socket, completing the 3-step TCP handshake asynchronously + """ if self._closed: raise ResourceClosed("I/O operation on closed socket") - to_wrap = await self.loop._accept_sock(self.sock) + to_wrap = await self.loop.accept_sock(self.sock) return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] async def send_all(self, data: bytes): - """Sends all data inside the buffer asynchronously until it is empty""" + """ + Sends all data inside the buffer asynchronously until it is empty + """ if self._closed: raise ResourceClosed("I/O operation on closed socket") - return await self.loop._sock_sendall(self.sock, data) + return await self.loop.sock_sendall(self.sock, data) async def close(self): - """Closes the socket asynchronously""" + """ + Closes the socket asynchronously + """ if self._closed: raise ResourceClosed("I/O operation on closed socket") - await sleep(0) # Give the scheduler the time to unregister the socket first - await self.loop._close_sock(self.sock) + await self.loop.close_sock(self.sock) self._closed = True async def connect(self, addr: tuple): - """Connects the socket to an endpoint""" + """ + Connects the socket to an endpoint + """ if self._closed: raise ResourceClosed("I/O operation on closed socket") - await self.loop._connect_sock(self.sock, addr) + await self.loop.connect_sock(self.sock, addr) async def __aenter__(self): - return self.sock.__enter__() + return self async def __aexit__(self, *_): await self.close() diff --git a/giambio/traps.py b/giambio/traps.py new file mode 100644 index 0000000..cb7f05d --- /dev/null +++ b/giambio/traps.py @@ -0,0 +1,134 @@ +""" +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Implementation for all giambio traps, which are hooks +# into the event loop and allow it to switch tasks +# These coroutines are the one and only way to interact +# with the event loop from the user's perspective, and +# the entire library is based on these traps + + +import types + + +@types.coroutine +def create_trap(method, *args): + """ + Creates and yields a trap. This + is the lowest-level method to + interact with the event loop + """ + + data = yield method, *args + return data + + +async def sleep(seconds: int): + """ + Pause the execution of an async function for a given amount of seconds, + without blocking the entire event loop, which keeps watching for other events + + This function is also useful as a sort of checkpoint, because it returns + control to the scheduler, which can then switch to another task. If your code + doesn't have enough calls to async functions (or 'checkpoints') this might + prevent the scheduler from switching tasks properly. If you feel like this + happens in your code, try adding a call to giambio.sleep(0) somewhere. + This will act as a checkpoint without actually pausing the execution + of your function, but it will allow the scheduler to switch tasks + + :param seconds: The amount of seconds to sleep for + :type seconds: int + """ + + assert seconds >= 0, "The time delay can't be negative" + await create_trap("sleep", seconds) + + +async def current_task(): + """ + Gets the currently running task + """ + + return await create_trap("get_running") + + +async def join(task): + """ + Awaits a given task for completion + + :param task: The task to join + :type task: class: Task + """ + + return await create_trap("join", task, await current_task()) + + +async def cancel(task): + """ + Cancels the given task + + The concept of cancellation is tricky, because there is no real way to 'stop' + a task if not by raising an exception inside it and ignoring whatever it + returns (and also hoping that the task won't cause collateral damage). It + is highly recommended that when you write async code you take into account + that it might be cancelled at any time. You might think to just ignore the + cancellation exception and be done with it, but doing so *will* break your + code, so if you really wanna do that be sure to re-raise it when done! + """ + + await create_trap("cancel", task) + assert task.cancelled, f"Coroutine ignored CancelledError" + + +async def want_read(stream): + """ + Notifies the event loop that a task that wants to read from the given + resource + + :param stream: The resource that needs to be read + """ + + await create_trap("want_read", stream) + + +async def want_write(stream): + """ + Notifies the event loop that a task that wants to read from the given + resource + + :param stream: The resource that needs to be written + """ + + await create_trap("want_write", stream) + + +async def event_set(event): + """ + Communicates to the loop that the given event object + must be set. This is important as the loop constantly + checks for active events to deliver them + """ + + await create_trap("event_set", event) + + +async def event_wait(event): + """ + Notifies the event loop that the current task has to wait + for given event to trigger + """ + + await create_trap("event_wait", event) diff --git a/tests/count.py b/tests/count.py index ee26da6..83f61c3 100644 --- a/tests/count.py +++ b/tests/count.py @@ -9,7 +9,7 @@ async def countdown(n: int): print(f"Down {n}") n -= 1 await giambio.sleep(1) - # raise Exception("oh no man") # Uncomment to test propagation +# raise Exception("oh no man") # Uncomment to test propagation print("Countdown over") return 0 @@ -18,7 +18,7 @@ async def countup(stop: int, step: int = 1): x = 0 while x < stop: print(f"Up {x}") - x += 1 + x += step await giambio.sleep(step) print("Countup over") return 1 @@ -26,30 +26,13 @@ async def countup(stop: int, step: int = 1): async def main(): try: - print("Creating an async pool") async with giambio.create_pool() as pool: - print("Starting counters") pool.spawn(countdown, 10) - count_up = pool.spawn(countup, 5, 2) - # raise Exception - # Raising an exception here has a weird - # Behavior: The exception is propagated - # *after* all the child tasks complete, - # which is not what we want - # print("Sleeping for 2 seconds before cancelling") - # await giambio.sleep(2) - # await count_up.cancel() # TODO: Cancel _is_ broken, this does not re-schedule the parent! - # print("Cancelled countup") - print("Task execution complete") + pool.spawn(countup, 5, 2) except Exception as e: - print(f"Caught this bad boy in here, propagating it -> {type(e).__name__}: {e}") - raise + print(f"Got -> {type(e).__name__}: {e}") + print("Task execution complete") if __name__ == "__main__": - print("Starting event loop") - try: - giambio.run(main) - except BaseException as error: - print(f"Exception caught from main event loop! -> {type(error).__name__}: {error}") - print("Event loop done") + giambio.run(main) diff --git a/tests/server.py b/tests/server.py index 9ab945b..fd693f0 100644 --- a/tests/server.py +++ b/tests/server.py @@ -3,6 +3,7 @@ from giambio.socket import AsyncSocket import socket import logging import sys +import traceback # A test to check for asynchronous I/O @@ -15,14 +16,10 @@ async def serve(address: tuple): sock.listen(5) asock = giambio.wrap_socket(sock) # We make the socket an async socket logging.info(f"Serving asynchronously at {address[0]}:{address[1]}") - while True: - try: - async with giambio.create_pool() as pool: - conn, addr = await asock.accept() - logging.info(f"{addr[0]}:{addr[1]} connected") - pool.spawn(handler, conn, addr) - except TypeError: - print("Looks like we have a naughty boy here!") + async with giambio.create_pool() as pool: + conn, addr = await asock.accept() + logging.info(f"{addr[0]}:{addr[1]} connected") + pool.spawn(handler, conn, addr) async def handler(sock: AsyncSocket, addr: tuple): @@ -51,6 +48,7 @@ if __name__ == "__main__": try: giambio.run(serve, ("localhost", port)) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + raise if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") else: