From f55826d534aa08cbd1a8c15d8f39be930af12cc4 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Fri, 23 Apr 2021 09:17:55 +0200 Subject: [PATCH] Moved socket functionality out of the loop and added some more functions to the socket module (updating examples) --- README.md | 22 +++++---- giambio/__init__.py | 4 +- giambio/context.py | 10 ++--- giambio/core.py | 106 +++++++------------------------------------- giambio/objects.py | 14 +++--- giambio/run.py | 7 ++- giambio/socket.py | 86 +++++++++++++++++++++++++++++++---- tests/server.py | 13 +++--- tests/timeout.py | 12 ++--- 9 files changed, 132 insertions(+), 142 deletions(-) diff --git a/README.md b/README.md index aa632ed..6702a95 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,13 @@ rock-solid and structured concurrency framework (I personally recommend trio and that most of the content of this document is ~~stolen~~ inspired from its documentation) +## Current limitations + +As I already mentioned, giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely: +- Ensure cancellations work 100% of the time even when `await`ing functions and not spawning them +- Extend I/O functionality +- Add task synchronization primitives such as locks and semaphores (events *sort of* work now) + # What the hell is async anyway? Libraries like giambio shine the most when it comes to performing asynchronous I/O (reading from a socket, writing to a file, that sort of thing). @@ -539,29 +546,26 @@ clients and dispatch them to some other handler. ```python import giambio -import socket import logging async def serve(bind_address: tuple): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(bind_address) - sock.listen(5) - async_sock = giambio.wrap_socket(sock) # We make the socket an async socket + sock = giambio.socket.socket() + await sock.bind(bind_address) + await sock.listen(5) logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") async with giambio.create_pool() as pool: while True: - conn, address_tuple = await async_sock.accept() + conn, address_tuple = await sock.accept() logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") pool.spawn(handler, conn, address_tuple) ``` So, our `serve` function does a few things: -- Sets up our server socket, just like in a synchronous server (notice how we bind and listen **before** wrapping it) -- Uses giambio's `wrap_socket` function to wrap the plain old synchronous socket into an async one +- Sets up our server socket, just like in a synchronous server - Opens a task pool and starts listening for clients in loop by using our new `giambio.socket.AsyncSocket` object - - Notice how we use `await async_sock.accept()` and not `sock.accept()`, because that could block the loop + - Notice how we use `await sock.accept()` and not `sock.accept()`, because that is an asynchronous socket! - Once a client connects, we log some information, spawn a new task and pass it the client socket: that is our client handler So, let's go over the declaration of `handler` then: diff --git a/giambio/__init__.py b/giambio/__init__.py index 062e70e..c8f5a9d 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -21,7 +21,6 @@ __version__ = (0, 0, 1) from . import exceptions, socket, context, core -from .socket import wrap_socket from .traps import sleep, current_task from .objects import Event from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout @@ -36,11 +35,10 @@ __all__ = [ "Event", "run", "clock", - "wrap_socket", "create_pool", "with_timeout", "get_event_loop", "current_task", "new_event_loop", - "debug" + "debug", ] diff --git a/giambio/context.py b/giambio/context.py index d696e2f..46d8dc1 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -23,22 +23,19 @@ from typing import List class TaskManager: """ - An asynchronous context manager for giambio + An asynchronous context manager for giambio, similar to trio's nurseries - :param loop: The event loop bound to this pool. Most of the times - it's the return value from giambio.get_event_loop() - :type loop: :class: AsyncScheduler :param timeout: The pool's timeout length in seconds, if any, defaults to None :type timeout: float, optional """ - def __init__(self, loop: "giambio.core.AsyncScheduler", timeout: float = None) -> None: + def __init__(self, timeout: float = None) -> None: """ Object constructor """ # The event loop associated with this pool - self.loop: "giambio.core.AsyncScheduler" = loop + self.loop: giambio.core.AsyncScheduler = giambio.get_event_loop() # All the tasks that belong to this pool self.tasks: List[giambio.objects.Task] = [] # Whether we have been cancelled or not @@ -85,7 +82,6 @@ class TaskManager: async def __aenter__(self): """ Implements the asynchronous context manager interface, - marking the pool as started and returning itself """ return self diff --git a/giambio/core.py b/giambio/core.py index 2cc761e..f43b819 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -142,6 +142,13 @@ class AsyncScheduler: # Then we try to awake event-waiting tasks if self.events: self.check_events() + if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: + # Stores deadlines for tasks (deadlines are pool-specific). + # The deadlines queue will internally make sure not to store + # a deadline for the same pool twice. This makes the timeouts + # model less flexible, because one can't change the timeout + # after it is set, but it makes the implementation easier. + self.deadlines.put(self.current_pool) # Otherwise, while there are tasks ready to run, we run them! while self.tasks: # Sets the currently running task @@ -151,13 +158,6 @@ class AsyncScheduler: # we still need to make sure we don't try to execute # exited tasks that are on the running queue continue - if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: - # Stores deadlines for tasks (deadlines are pool-specific). - # The deadlines queue will internally make sure not to store - # a deadline for the same pool twice. This makes the timeouts - # model less flexible, because one can't change the timeout - # after it is set, but it makes the implementation easier. - self.deadlines.put(self.current_pool) self.debugger.before_task_step(self.current_task) if self.current_task.cancel_pending: # We perform the deferred cancellation @@ -244,12 +244,8 @@ class AsyncScheduler: while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): pool = self.deadlines.get() pool.timed_out = True - self.cancel_pool(pool) - # Since we already know that exceptions will behave correctly - # (heck, half of the code in here only serves that purpose) - # all we do here is just raise an exception as if the current - # task raised it and let our machinery deal with the rest - raise TooSlowError() + if not self.current_task.done(): + self.current_task.throw(TooSlowError()) def check_events(self): """ @@ -393,8 +389,9 @@ class AsyncScheduler: Yields all tasks currently waiting on I/O resources """ - for k in self.selector.get_map().values(): - yield k.data + if self.selector.get_map(): + for k in self.selector.get_map().values(): + yield k.data def get_all_tasks(self) -> chain: """ @@ -407,7 +404,8 @@ class AsyncScheduler: return chain(self.tasks, self.get_asleep_tasks(), self.get_event_tasks(), - self.get_io_tasks()) + self.get_io_tasks(), + [self.current_task]) def ensure_discard(self, task: Task): """ @@ -472,12 +470,13 @@ class AsyncScheduler: # occur self.tasks.append(t) + # noinspection PyMethodMayBeStatic def is_pool_done(self, pool: TaskManager) -> bool: """ Returns true if the given pool has finished running and can be safely terminated - :return: Whether the pool and any enclosing pools finished running + :return: Whether the pool finished running :rtype: bool """ @@ -647,79 +646,6 @@ class AsyncScheduler: # The socket is already registered doing something else raise ResourceBusy("The given socket is being read/written by another task") from None - # noinspection PyMethodMayBeStatic - async def read_sock(self, sock: socket.socket, buffer: int): - """ - Reads from a socket asynchronously, waiting until the resource is - available and returning up to buffer bytes from the socket - - :param sock: The socket that must be read - :type sock: socket.socket - :param buffer: The maximum amount of bytes that will be read - :type buffer: int - """ - - await want_read(sock) - return sock.recv(buffer) - - # noinspection PyMethodMayBeStatic - async def accept_sock(self, sock: socket.socket): - """ - Accepts a socket connection asynchronously, waiting until the resource - is available and returning the result of the sock.accept() call - - :param sock: The socket that must be accepted - :type sock: socket.socket - """ - - await want_read(sock) - try: - return sock.accept() - except BlockingIOError: - # Some platforms (namely OSX systems) act weird and handle - # the errno 35 signal (EAGAIN) for sockets in a weird manner, - # and this seems to fix the issue. Not sure about why since we - # already called want_read above, but it ain't stupid if it works I guess - await want_read(sock) - return sock.accept() - - # noinspection PyMethodMayBeStatic - async def sock_sendall(self, sock: socket.socket, data: bytes): - """ - Sends all the passed bytes trough the given socket, asynchronously - - :param sock: The socket that must be written - :type sock: socket.socket - :param data: The bytes to send across the socket - :type data: bytes - """ - - while data: - await want_write(sock) - try: - sent_no = sock.send(data) - except BlockingIOError: - await want_write(sock) - sent_no = sock.send(data) - data = data[sent_no:] - - async def close_sock(self, sock: socket.socket): - """ - Closes the given socket asynchronously - - :param sock: The socket that must be closed - :type sock: socket.socket - """ - - await want_write(sock) - try: - sock.close() - except BlockingIOError: - await want_write(sock) - sock.close() - self.selector.unregister(sock) - self.current_task.last_io = () - # noinspection PyMethodMayBeStatic async def connect_sock(self, sock: socket.socket, address_tuple: tuple): """ diff --git a/giambio/objects.py b/giambio/objects.py index 23c8938..952a655 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -203,8 +203,8 @@ class TimeQueue: or -1 if it is not present """ - for i in self.container: - if i[2] == item: + for i, e in enumerate(self.container): + if e[2] == item: return i return -1 @@ -222,8 +222,8 @@ class TimeQueue: """ idx = self.index(item) - if idx != 1: - self.container.remove(idx) + if idx != -1: + self.container.pop(idx) heapify(self.container) def get_closest_deadline(self) -> float: @@ -333,8 +333,8 @@ class DeadlinesQueue: or -1 if it is not present """ - for i in self.container: - if i[2] == item: + for i, e in enumerate(self.container): + if e[2] == item: return i return -1 @@ -353,7 +353,7 @@ class DeadlinesQueue: idx = self.index(item) if idx != 1: - self.container.remove(idx) + self.container.pop(idx) heapify(self.container) def get_closest_deadline(self) -> float: diff --git a/giambio/run.py b/giambio/run.py index 6ad3e71..babee34 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -91,7 +91,7 @@ def create_pool(): """ loop = get_event_loop() - pool = TaskManager(loop) + pool = TaskManager() loop.current_pool = pool return pool @@ -102,6 +102,9 @@ def with_timeout(timeout: int or float): """ loop = get_event_loop() - pool = TaskManager(loop, timeout) + # We add 1 to make the timeout intuitive and inclusive (i.e. + # a 10 seconds timeout means the task is allowed to run 10 + # whole seconds instead of cancelling at the tenth second) + pool = TaskManager(timeout + 1) loop.current_pool = pool return pool diff --git a/giambio/socket.py b/giambio/socket.py index eaec16d..ddb8fe3 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -15,9 +15,10 @@ See the License for the specific language governing permissions and limitations under the License. """ -import socket +import socket as builtin_socket from giambio.run import get_event_loop from giambio.exceptions import ResourceClosed +from giambio.traps import want_write, want_read class AsyncSocket: @@ -25,7 +26,7 @@ class AsyncSocket: Abstraction layer for asynchronous sockets """ - def __init__(self, sock: socket.socket): + def __init__(self, sock: builtin_socket.socket): self.sock = sock self.loop = get_event_loop() self._closed = False @@ -38,7 +39,13 @@ class AsyncSocket: if self._closed: raise ResourceClosed("I/O operation on closed socket") - return await self.loop.read_sock(self.sock, max_size) + assert max_size >= 1, "max_size must be >= 1" + await want_read(self.sock) + try: + return self.sock.recv(max_size) + except BlockingIOError: + await want_read(self.sock) + return self.sock.recv(max_size) async def accept(self): """ @@ -47,7 +54,16 @@ class AsyncSocket: if self._closed: raise ResourceClosed("I/O operation on closed socket") - to_wrap = await self.loop.accept_sock(self.sock) + await want_read(self.sock) + try: + to_wrap = self.sock.accept() + except BlockingIOError: + # Some platforms (namely OSX systems) act weird and handle + # the errno 35 signal (EAGAIN) for sockets in a weird manner, + # and this seems to fix the issue. Not sure about why since we + # already called want_read above, but it ain't stupid if it works I guess + await want_read(self.sock) + to_wrap = self.sock.accept() return wrap_socket(to_wrap[0]), to_wrap[1] async def send_all(self, data: bytes): @@ -57,7 +73,14 @@ class AsyncSocket: if self._closed: raise ResourceClosed("I/O operation on closed socket") - return await self.loop.sock_sendall(self.sock, data) + while data: + await want_write(self.sock) + try: + sent_no = self.sock.send(data) + except BlockingIOError: + await want_write(self.sock) + sent_no = self.sock.send(data) + data = data[sent_no:] async def close(self): """ @@ -66,7 +89,14 @@ class AsyncSocket: if self._closed: raise ResourceClosed("I/O operation on closed socket") - await self.loop.close_sock(self.sock) + await want_write(self.sock) + try: + self.sock.close() + except BlockingIOError: + await want_write(self.sock) + self.sock.close() + self.loop.selector.unregister(self.sock) + self.loop.current_task.last_io = () self._closed = True async def connect(self, addr: tuple): @@ -76,7 +106,36 @@ class AsyncSocket: if self._closed: raise ResourceClosed("I/O operation on closed socket") - await self.loop.connect_sock(self.sock, addr) + await want_write(self.sock) + try: + self.sock.connect(addr) + except BlockingIOError: + await want_write(self.sock) + self.sock.connect(addr) + + async def bind(self, addr: tuple): + """ + Binds the socket to an address + + :param addr: The address, port tuple to bind to + :type addr: tuple + """ + + if self._closed: + raise ResourceClosed("I/O operation on closed socket") + self.sock.bind(addr) + + async def listen(self, backlog: int): + """ + Starts listening with the given backlog + + :param backlog: The address, port tuple to bind to + :type backlog: int + """ + + if self._closed: + raise ResourceClosed("I/O operation on closed socket") + self.sock.listen(backlog) def __del__(self): """ @@ -100,9 +159,20 @@ class AsyncSocket: return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" -def wrap_socket(sock: socket.socket) -> AsyncSocket: +def wrap_socket(sock: builtin_socket.socket) -> AsyncSocket: """ Wraps a standard socket into an async socket """ return AsyncSocket(sock) + + +def socket(*args, **kwargs): + """ + Creates a new giambio socket, taking in the same positional and + keyword arguments as the standard library's socket.socket + constructor + """ + + return AsyncSocket(builtin_socket.socket(*args, **kwargs)) + diff --git a/tests/server.py b/tests/server.py index ad4b79c..4981959 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,7 +1,5 @@ import giambio from giambio.socket import AsyncSocket -from debugger import Debugger -import socket import logging import sys @@ -16,14 +14,13 @@ async def serve(bind_address: tuple): (address, port) where address is a string and port is an integer """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(bind_address) - sock.listen(5) - async_sock = giambio.wrap_socket(sock) # We make the socket an async socket + sock = giambio.socket.socket() + await sock.bind(bind_address) + await sock.listen(5) logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") async with giambio.create_pool() as pool: while True: - conn, address_tuple = await async_sock.accept() + conn, address_tuple = await sock.accept() logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") pool.spawn(handler, conn, address_tuple) @@ -61,7 +58,7 @@ if __name__ == "__main__": port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501 logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p") try: - giambio.run(serve, ("localhost", port), debugger=None) + giambio.run(serve, ("localhost", port)) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") diff --git a/tests/timeout.py b/tests/timeout.py index e7e6267..4c700de 100644 --- a/tests/timeout.py +++ b/tests/timeout.py @@ -11,17 +11,13 @@ async def child(name: int): async def main(): start = giambio.clock() try: - async with giambio.with_timeout(6) as pool: - # TODO: We need to consider the inner part of - # the with block as an implicit task, otherwise - # timeouts and cancellations won't work with await fn()! - pool.spawn(child, 5) # This will complete - pool.spawn(child, 10) # This will not - print("[main] Children spawned, awaiting completion") + async with giambio.with_timeout(10) as pool: + pool.spawn(child, 7) # This will complete + await child(20) # TODO: Broken except giambio.exceptions.TooSlowError: print("[main] One or more children have timed out!") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") if __name__ == "__main__": - giambio.run(main, debugger=()) + giambio.run(main, debugger=Debugger())