From 95a9300da85af7caceed4491470a8a8588108398 Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Thu, 3 Jun 2021 16:34:26 +0200 Subject: [PATCH] Moved some code around, added more socket methods and fixed I/O for SSL sockets (hopefully) --- README.md | 51 ++++++++- giambio/core.py | 249 ++++++++++++++++++++++++++------------------ giambio/objects.py | 22 +++- giambio/socket.py | 186 +++++++++++++++++++++------------ giambio/traps.py | 17 ++- tests/exceptions.py | 2 +- tests/server.py | 1 + 7 files changed, 349 insertions(+), 179 deletions(-) diff --git a/README.md b/README.md index bd2e618..e4be589 100644 --- a/README.md +++ b/README.md @@ -10,21 +10,62 @@ production ready, so be aware that it is likely (if not guaranteed) that you'll # Disclaimer -Right now this is nothing more than a toy implementation to help me understand how this whole `async`/`await` thing works -and it is pretty much guaranteed to explode spectacularly badly while using it. If you find any bugs, please report them! - This project was hugely inspired by the [curio](https://github.com/dabeaz/curio) and the [trio](https://github.com/python-trio/trio) projects, you might want to have a look at their amazing work if you need a rock-solid and structured concurrency framework (I personally recommend trio and that's definitely not related to the fact that most of the content of this document is ~~stolen~~ inspired from its documentation) +## Goals of this project + +Making yet another async library might sound dumb in an already fragmented ecosystem like Python's. +In fact, giambio was initially born as a fun toy project to help me understand how this whole async/await magic actually worked, but while I researched this topic +further I found some issues with the current async ecosystem in Python. +As of the time of writing, the ecosystem for async libraries is divided as follows: + - Asyncio. Since it's in the stdlib, it sets a standard all of its own + - Tornado/Gevent/other old frameworks (based partly on asyncio or not) + - Modern, post PEP 492 frameworks like curio and trio + +The main issue with asyncio is too complex to explain here in detail, but in short +it boils down to the fact that it is an old library which was not designed to take advantage +of the new async/await features natively and uses callbacks instead. There is a compatibility +layer to use async/await, but that only causes more problems because it still runs on top of the legacy +callback-based code (and it can't be used always, anyway). Asyncio has also a bunch +of problems with exception propagation and cancellation, which is an issue shared +by other old libraries like tornado and gevent. + +To address this problem, libraries like trio and curio were born, implementing a new +paradigm called _Structured Concurrency_, which makes the async model much easier to use and +reason about. The two main players in this space are trio and curio. + +Trio is an amazing library, probably the most advanced I've ever used, but for this exact +reason it has 2 main issues: +- The code is extremely intimidating to look at (without needing to be, read below) +- It has a lot, and I mean a LOT, of layers of indirections and fancy features that are useful, but also slow down execution + + +Curio has its own set of issues, namely: +- It allows orphaned tasks (i.e. tasks not spawned trough a `curio.TaskPool`), so it partially breaks structured concurrency +- It is not a community project, sadly +- Big chunks of code are completely undocumented: curio's loop is basically a black box to external code (and that's a design choice) + +What I did love about curio though, is that its code is understandable once you go down the "writing an async scheduler" rabbithole, and it is in +fact my main source of ispiration for writing giambio as of now. Curio is also around 2 times faster than both trio and asyncio, according to benchmarks. + +Giambio means to take the best of all of its predecessors, while being: +- Fully documented and type hinted for 100% editor coverage +- Small, but featureful +- Fast, possibly as fast as curio, if not better +- Dependency-free: No fancy C modules, no external libraries, just pure idiomatic Python code +- Community-based: I frankly wouldn't have bothered making this if curio was open to community additions + ## Current limitations -As I already mentioned, giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely: +giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely: - Ensure cancellations work 100% of the time even when `await`ing functions and not spawning them - Extend I/O functionality -- Add task synchronization primitives such as locks and semaphores (events *sort of* work now) +- Add other task synchronization primitives such as locks and semaphores +- Documentation # What the hell is async anyway? diff --git a/giambio/core.py b/giambio/core.py index 22afb8f..1d4e07d 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -34,28 +34,45 @@ from giambio.exceptions import (InternalError, TooSlowError ) -# TODO: Right now this value is pretty much arbitrary, we need some testing to choose a sensible default -IO_SKIP_LIMIT = 5 class AsyncScheduler: """ - A simple asynchronous scheduler implementation. Tries to mimic the threaded - model in its simplicity, without using actual threads, but rather alternating + A simple task scheduler implementation that tries to mimic thread programming + in its simplicity, without using actual threads, but rather alternating across coroutines execution to let more than one thing at a time to proceed with its calculations. An attempt to fix the threaded model has been made - without making the API unnecessarily complicated. A few examples are tasks - cancellation and exception propagation. + without making the API unnecessarily complicated. + + This loop only provides the most basic support for task scheduling, I/O + multiplexing, event delivery, task cancellation and exception propagation: + any other feature should therefore be implemented in higher-level object + wrappers (see socket.py and event.py for example). An object wrapper should + not depend on the loop's implementation details such as internal state or + directly access its methods: traps should be used instead; This is to + ensure that the wrapper will keep working even if the scheduler giambio + is using changes, which means it is entirely possible, and reasonable, to + write your own event loop and run giambio on top of it, provided the required + traps are correctly implemented. :param clock: A callable returning monotonically increasing values at each call, - defaults to timeit.default_timer + defaults to timeit.default_timer :type clock: :class: types.FunctionType :param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output is desired, defaults to None :type debugger: :class: giambio.util.BaseDebugger + :param selector: The selector to use for I/O multiplexing, defaults to selectors.DefaultSelector + :param io_skip_limit: The max. amount of times I/O checks can be skipped when + there are tasks to run. This makes sure that highly concurrent systems do not starve + I/O waiting tasks. Defaults to 5 + :type io_skip_limit: int, optional + :param io_max_timeout: The max. amount of seconds to pause for an I/O timeout. + Keep in mind that this timeout is only valid if there are no deadlines happening before + the timeout expires. Defaults to 86400 (1 day) + :type io_max_timeout: int, optional """ - def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None): + def __init__(self, clock: types.FunctionType = default_timer, debugger: Optional[BaseDebugger] = None, selector: Optional[Any] = None, io_skip_limit: Optional[int] = None, io_max_timeout: Optional[int] = None): """ Object constructor """ @@ -79,8 +96,6 @@ class AsyncScheduler: self.paused: TimeQueue = TimeQueue(self.clock) # All active Event objects self.events: Set[Event] = set() - # Data to send back to a trap - self.to_send: Optional[Any] = None # Have we ever ran? self.has_ran: bool = False # The current pool @@ -91,6 +106,13 @@ class AsyncScheduler: self.io_skip: int = 0 # A heap queue of deadlines to be checked self.deadlines: DeadlinesQueue = DeadlinesQueue() + # Data to send back to a trap + self._data: Optional[Any] = None + # The I/O skip limit. TODO: Back up this value with euristics + self.io_skip_limit = io_skip_limit + # The max. I/O timeout + self.io_max_timeout = io_max_timeout + def done(self) -> bool: """ @@ -111,10 +133,20 @@ class AsyncScheduler: def run(self): """ - Starts the loop and 'listens' for events until there is work to do, - then exits. This behavior kinda resembles a kernel, as coroutines can - request the loop's functionality only trough some fixed entry points, - which in turn yield and give execution control to the loop itself. + The event loop's runner function. This method drives + execution for the entire framework and orchestrates I/O, + events, sleeping, cancellations and deadlines, but the + actual functionality for all of that is implemented in + object wrappers (see socket.py or event.py for example). + + This keeps the size of this module to a minimum while + allowing anyone to replace it with their own, as long + as the traps required by higher-level giambio objects + are implemented. If you want to add features to the + library, don't add them here, but take inspiration + from the current object wrappers (i.e. not depending + on any implementation detail from the loop other than + traps) """ while True: @@ -126,9 +158,9 @@ class AsyncScheduler: self.close() break elif not self.tasks: - # If there are no actively running tasks, we start by checking - # for I/O. This method will wait for I/O until the closest deadline - # to avoid starving sleeping tasks + # If there are no actively running tasks, we start by + # checking for I/O. This method will wait for I/O until + # the closest deadline to avoid starving sleeping tasks if self.selector.get_map(): self.check_io() if self.deadlines: @@ -145,50 +177,11 @@ class AsyncScheduler: # The deadlines queue will internally make sure not to store # a deadline for the same pool twice. This makes the timeouts # model less flexible, because one can't change the timeout - # after it is set, but it makes the implementation easier. + # after it is set, but it makes the implementation easier self.deadlines.put(self.current_pool) # Otherwise, while there are tasks ready to run, we run them! while self.tasks: - # Sets the currently running task - self.current_task = self.tasks.pop(0) - if self.current_task.done(): - # Since ensure_discard only checks for paused tasks, - # we still need to make sure we don't try to execute - # exited tasks that are on the running queue - continue - self.debugger.before_task_step(self.current_task) - if self.current_task.cancel_pending: - # We perform the deferred cancellation - # if it was previously scheduled - self.cancel(self.current_task) - if self.to_send and self.current_task.status != "init": - # A little setup to send objects from and to - # coroutines outside the event loop - data = self.to_send - else: - # The first time coroutines' method .send() wants None! - data = None - # Run a single step with the calculation (i.e. until a yield - # somewhere) - method, *args = self.current_task.run(data) - # Some debugging and internal chatter here - self.current_task.status = "run" - self.current_task.steps += 1 - self.debugger.after_task_step(self.current_task) - # If data has been sent, reset it to None - if self.to_send and self.current_task != "init": - self.to_send = None - # Sneaky method call, thanks to David Beazley for this ;) - if not hasattr(self, method): - # If this happens, that's quite bad! - # This if block is meant to be triggered by other async - # libraries, which most likely have different trap names and behaviors - # compared to us. If you get this exception and you're 100% sure you're - # not mixing async primitives from other libraries, then it's a bug! - raise InternalError("Uh oh! Something very bad just happened, did" - " you try to mix primitives from other async libraries?") from None - - getattr(self, method)(*args) + self.run_task_step() except StopIteration as ret: # At the end of the day, coroutines are generator functions with # some tricky behaviors, and this is one of them. When a coroutine @@ -201,6 +194,7 @@ class AsyncScheduler: self.current_task.result = ret.value self.current_task.finished = True self.debugger.on_task_exit(self.current_task) + self.io_release_task(self.current_task) self.join(self.current_task) except BaseException as err: # TODO: We might want to do a bit more complex traceback hacking to remove any extra @@ -210,8 +204,79 @@ class AsyncScheduler: self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next self.current_task.status = "crashed" self.debugger.on_exception_raised(self.current_task, err) + self.io_release_task(self.current_task) self.join(self.current_task) - self.ensure_discard(self.current_task) + + def run_task_step(self): + """ + Runs a single step for the current task. + A step ends when the task awaits any of + giambio's primitives or async methods. + + Note that this method does NOT catch any + exception arising from tasks, nor does it + take StopIteration or CancelledError into + account, that's self.run's job! + """ + + # Sets the currently running task + self.current_task = self.tasks.pop(0) + if self.current_task.done(): + # We need to make sure we don't try to execute + # exited tasks that are on the running queue + return + self.debugger.before_task_step(self.current_task) + if self.current_task.cancel_pending: + # We perform the deferred cancellation + # if it was previously scheduled + self.cancel(self.current_task) + # Little boilerplate to send data back to an async trap + data = None + if self.current_task.status != "init": + data = self._data + # Run a single step with the calculation (i.e. until a yield + # somewhere) + method, *args = self.current_task.run(data) + self._data = None + # Some debugging and internal chatter here + self.current_task.status = "run" + self.current_task.steps += 1 + self.debugger.after_task_step(self.current_task) + if not hasattr(self, method): + # If this happens, that's quite bad! + # This if block is meant to be triggered by other async + # libraries, which most likely have different trap names and behaviors + # compared to us. If you get this exception and you're 100% sure you're + # not mixing async primitives from other libraries, then it's a bug! + raise InternalError("Uh oh! Something very bad just happened, did" + " you try to mix primitives from other async libraries?") from None + + # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, method)(*args) + + + def io_release_task(self, task: Task): + """ + Calls self.io_release in a loop + for each I/O resource the given task owns + """ + + if self.selector.get_map(): + for k in filter(lambda o: o.data == self.current_task, + dict(self.selector.get_map()).values()): + self.io_release(k.fileobj) + task.last_io = () + + def io_release(self, sock): + """ + Releases the given resource from our + selector. + + :param sock: The resource to be released + """ + + if self.selector.get_map() and sock in self.selector.get_map(): + self.selector.unregister(sock) def do_cancel(self, task: Task): """ @@ -223,15 +288,17 @@ class AsyncScheduler: """ self.debugger.before_cancel(task) - task.throw(CancelledError()) + error = CancelledError() + error.task = task + task.throw(error) - def get_current(self): + def get_current(self) -> Task: """ - Returns the current task to an async caller + 'Returns' the current task to an async caller """ + self._data = self.current_task self.tasks.append(self.current_task) - self.to_send = self.current_task def expire_deadlines(self): """ @@ -272,7 +339,6 @@ class AsyncScheduler: # Reschedules tasks when their deadline has elapsed task = self.paused.get() slept = self.clock() - task.sleep_start - task.sleep_start = 0.0 self.tasks.append(task) self.debugger.after_sleep(task, slept) @@ -305,15 +371,15 @@ class AsyncScheduler: before_time = self.clock() # Used for the debugger if self.tasks or self.events: - # If there is work to do immediately we prefer to + # If there is work to do immediately (tasks to run) we prefer to # do that first unless some conditions are met, see below self.io_skip += 1 - if self.io_skip == IO_SKIP_LIMIT: + if self.io_skip == self.io_skip_limit: # We can't skip every time there's some task ready - # or else we might starve I/O waiting tasks - # when a lot of things are running at the same time + # or else we might starve I/O waiting tasks when a + # lot of things are running at the same time self.io_skip = 0 - timeout = 86400 + timeout = self.io_max_timeout else: # If there are either tasks or events and no I/O, don't wait # (unless we already skipped this check too many times) @@ -323,7 +389,7 @@ class AsyncScheduler: timeout = self.get_closest_deadline() else: # If there is *only* I/O, we wait a fixed amount of time - timeout = 86400 # Stolen from trio :D + timeout = self.io_max_timeout self.debugger.before_io(timeout) io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks @@ -405,27 +471,6 @@ class AsyncScheduler: self.get_io_tasks(), [self.current_task]) - def ensure_discard(self, task: Task): - """ - Ensures that tasks that need to be cancelled are not - rescheduled further. This method exists because tasks might - be cancelled in a context where it's not obvious which Task - object must be discarded and not rescheduled at the next iteration - """ - - # TODO: Do we need else ifs or ifs? The question arises because - # tasks might be doing I/O and other stuff too even if not at the same time - if task in self.paused: - self.paused.discard(task) - if self.selector.get_map(): - for key in dict(self.selector.get_map()).values(): - if key.data == task: - self.selector.unregister(key.fileobj) - if task in self.get_event_tasks(): - for evt in self.events: - if task in evt.waiters: - evt.waiters.remove(task) - def cancel_all(self) -> bool: """ Cancels ALL tasks as returned by self.get_all_tasks() and returns @@ -538,19 +583,25 @@ class AsyncScheduler: or does so straight away if it is safe to do so """ - if task.done(): - self.ensure_discard(task) + if task.done() or task.status == "init": # The task isn't running already! + task.cancel_pending = False return - elif task.status in ("io", "sleep", "init"): + elif task.status in ("io", "sleep"): # We cancel immediately only in a context where it's safer to do # so. The concept of "safer" is quite tricky, because even though the # task is technically not running, it might leave some unfinished state # or dangling resource open after being cancelled, so maybe we need # a different approach altogether + if task.status == "io": + for k in filter(lambda o: o.data == task, + dict(self.selector.get_map()).values()): + self.selector.unregister(k.fileobj) + elif task.status == "sleep": + self.paused.discard(task) try: self.do_cancel(task) - except CancelledError: + except CancelledError as cancel: # When a task needs to be cancelled, giambio tries to do it gracefully # first: if the task is paused in either I/O or sleeping, that's perfect. # But we also need to cancel a task if it was not sleeping or waiting on @@ -560,11 +611,12 @@ class AsyncScheduler: # to re-raise cancellations at every checkpoint until the task lets the # exception propagate into us, because we *really* want the task to be # cancelled - task.status = "cancelled" - task.cancelled = True + task = cancel.task task.cancel_pending = False + task.cancelled = True + task.status = "cancelled" + self.io_release_task(self.current_task) self.debugger.after_cancel(task) - self.ensure_discard(task) else: # If we can't cancel in a somewhat "graceful" way, we just # defer this operation for later (check run() for more info) @@ -606,7 +658,6 @@ class AsyncScheduler: :param sock: The socket on which a read or write operation has to be performed - :type sock: socket.socket :param evt_type: The type of event to perform on the given socket, either "read" or "write" :type evt_type: str diff --git a/giambio/objects.py b/giambio/objects.py index 952a655..07c9546 100644 --- a/giambio/objects.py +++ b/giambio/objects.py @@ -51,8 +51,10 @@ class Task: # time by the event loop, internally. Possible values for this are "init"-- # when the task has been created but not started running yet--, "run"-- when # the task is running synchronous code--, "io"-- when the task is waiting on - # an I/O resource--, "sleep"-- when the task is either asleep or waiting on an event - # and "crashed"-- when the task has exited because of an exception + # an I/O resource--, "sleep"-- when the task is either asleep or waiting on + # an event, "crashed"-- when the task has exited because of an exception + # and "cancelled" when-- when the task has been explicitly cancelled with + # its cancel() method or as a result of an exception status: str = "init" # This attribute counts how many times the task's run() method has been called steps: int = 0 @@ -130,6 +132,17 @@ class Task: return self.exc or self.finished or self.cancelled + def __del__(self): + """ + Task destructor + """ + + try: + self.coroutine.close() + except RuntimeError: + pass # TODO: This is kinda bad + assert not self.last_io + class Event: """ @@ -143,7 +156,6 @@ class Event: self.set = False self.waiters = [] - self.event_caught = False async def trigger(self): """ @@ -169,8 +181,8 @@ class TimeQueue: sleeping tasks will be put when they are not running :param clock: The same monotonic clock that was passed to the thread-local event loop. - It is important for the queue to be synchronized with the loop as this allows - the sleeping mechanism to work reliably + It is important for the queue to be synchronized with the loop as this allows + the sleeping mechanism to work reliably """ def __init__(self, clock): diff --git a/giambio/socket.py b/giambio/socket.py index f1636fb..d3f4a22 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -16,13 +16,18 @@ limitations under the License. """ import ssl +from socket import SOL_SOCKET, SO_ERROR import socket as builtin_socket -from giambio.run import get_event_loop from giambio.exceptions import ResourceClosed from giambio.traps import want_write, want_read -from ssl import SSLWantReadError, SSLWantWriteError -IOInterrupt = (BlockingIOError, InterruptedError, SSLWantReadError, SSLWantWriteError) +try: + from ssl import SSLWantReadError, SSLWantWriteError + WantRead = (BlockingIOError, InterruptedError, SSLWantReadError) + WantWrite = (BlockingIOError, InterruptedError, SSLWantWriteError) +except ImportError: + WantRead = (BlockingIOError, InterruptedError) + WantWrite = (BlockingIOError, InterruptedError) class AsyncSocket: @@ -32,61 +37,55 @@ class AsyncSocket: def __init__(self, sock): self.sock = sock - self.loop = get_event_loop() - self._closed = False + self._fd = sock.fileno() self.sock.setblocking(False) - async def receive(self, max_size: int): + + async def receive(self, max_size: int, flags: int = 0) -> bytes: """ Receives up to max_size bytes from a socket asynchronously """ - if self._closed: - raise ResourceClosed("I/O operation on closed socket") assert max_size >= 1, "max_size must be >= 1" - if isinstance(self.sock, ssl.SSLSocket) and self.sock.pending(): - print(self.sock.pending()) - return self.sock.recv(self.sock.pending()) - await want_read(self.sock) - try: - return self.sock.recv(max_size) - except IOInterrupt: - await want_read(self.sock) - return self.sock.recv(max_size) + data = b"" + if self._fd == -1: + raise ResourceClosed("I/O operation on closed socket") + while True: + try: + return self.sock.recv(max_size, flags) + except WantRead: + await want_read(self.sock) + except WantWrite: + await want_write(self.sock) async def accept(self): """ Accepts the socket, completing the 3-step TCP handshake asynchronously """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") - await want_read(self.sock) - try: - to_wrap = self.sock.accept() - except IOInterrupt: - # Some platforms (namely OSX systems) act weird and handle - # the errno 35 signal (EAGAIN) for sockets in a weird manner, - # and this seems to fix the issue. Not sure about why since we - # already called want_read above, but it ain't stupid if it works I guess - await want_read(self.sock) - to_wrap = self.sock.accept() - return wrap_socket(to_wrap[0]), to_wrap[1] + while True: + try: + remote, addr = self.sock.accept() + return wrap_socket(remote), addr + except WantRead: + await want_read(self.sock) - async def send_all(self, data: bytes): + async def send_all(self, data: bytes, flags: int = 0): """ Sends all data inside the buffer asynchronously until it is empty """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") while data: - await want_write(self.sock) try: - sent_no = self.sock.send(data) - except IOInterrupt: + sent_no = self.sock.send(data, flags) + except WantRead: + await want_read(self.sock) + except WantWrite: await want_write(self.sock) - sent_no = self.sock.send(data) data = data[sent_no:] async def close(self): @@ -94,30 +93,26 @@ class AsyncSocket: Closes the socket asynchronously """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") - await want_write(self.sock) - try: - self.sock.close() - except IOInterrupt: - await want_write(self.sock) - self.sock.close() - self.loop.selector.unregister(self.sock) - self.loop.current_task.last_io = () - self._closed = True + await release_sock(self.sock) + self.sock.close() + self._sock = None + self.sock = -1 async def connect(self, addr: tuple): """ Connects the socket to an endpoint """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") - await want_write(self.sock) try: self.sock.connect(addr) - except IOInterrupt as io_interrupt: + except WantWrite: await want_write(self.sock) + self.sock.connect(addr) + async def bind(self, addr: tuple): """ @@ -127,7 +122,7 @@ class AsyncSocket: :type addr: tuple """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") self.sock.bind(addr) @@ -139,29 +134,88 @@ class AsyncSocket: :type backlog: int """ - if self._closed: + if self.sock == -1: raise ResourceClosed("I/O operation on closed socket") self.sock.listen(backlog) - def __del__(self): - """ - Implements the destructor for the async socket, - notifying the event loop that the socket must not - be listened for anymore. This avoids the loop - blocking forever on trying to read from a socket - that's gone out of scope without being closed - """ - - if not self._closed and self.loop.selector.get_map() and self.sock in self.loop.selector.get_map(): - self.loop.selector.unregister(self.sock) - self.loop.current_task.last_io = () - self._closed = True - async def __aenter__(self): + self.sock.__enter__() return self - async def __aexit__(self, *_): - await self.close() + async def __aexit__(self, *args): + if self.sock: + self.sock.__exit__(*args) + + # Yes, I stole these from Curio because I could not be + # arsed to write a bunch of uninteresting simple socket + # methods from scratch, deal with it. + + async def connect(self, address): + try: + result = self.sock.connect(address) + if getattr(self, 'do_handshake_on_connect', False): + await self.do_handshake() + return result + except WantWrite: + await want_write(self.sock) + err = self.sock.getsockopt(SOL_SOCKET, SO_ERROR) + if err != 0: + raise OSError(err, f'Connect call failed {address}') + if getattr(self, 'do_handshake_on_connect', False): + await self.do_handshake() + + async def recvfrom(self, buffersize, flags=0): + while True: + try: + return self.sock.recvfrom(buffersize, flags) + except WantRead: + await want_read(self.sock) + except WantWrite: + await want_write(self.sock) + + async def recvfrom_into(self, buffer, bytes=0, flags=0): + while True: + try: + return self.sock.recvfrom_into(buffer, bytes, flags) + except WantRead: + await want_read(self.sock) + except WantWrite: + await want_write(self.sock) + + async def sendto(self, bytes, flags_or_address, address=None): + if address: + flags = flags_or_address + else: + address = flags_or_address + flags = 0 + while True: + try: + return self.sock.sendto(bytes, flags, address) + except WantWrite: + await want_write(self.sock) + except WantRead: + await want_read(self.sock) + + async def recvmsg(self, bufsize, ancbufsize=0, flags=0): + while True: + try: + return self.sock.recvmsg(bufsize, ancbufsize, flags) + except WantRead: + await want_read(self.sock) + + async def recvmsg_into(self, buffers, ancbufsize=0, flags=0): + while True: + try: + return self.sock.recvmsg_into(buffers, ancbufsize, flags) + except WantRead: + await want_read(self.sock) + + async def sendmsg(self, buffers, ancdata=(), flags=0, address=None): + while True: + try: + return self.sock.sendmsg(buffers, ancdata, flags, address) + except WantRead: + await want_write(self.sock) def __repr__(self): return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" diff --git a/giambio/traps.py b/giambio/traps.py index 94ed2ce..f908fe1 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -38,14 +38,15 @@ def create_trap(method, *args): async def sleep(seconds: int): """ - Pause the execution of an async function for a given amount of seconds, - without blocking the entire event loop, which keeps watching for other events. + Pause the execution of an async function for a given amount of seconds. + This function is functionally equivalent to time.sleep, but can be used + within async code without blocking everything else. This function is also useful as a sort of checkpoint, because it returns control to the scheduler, which can then switch to another task. If your code doesn't have enough calls to async functions (or 'checkpoints') this might prevent the scheduler from switching tasks properly. If you feel like this - happens in your code, try adding a call to giambio.sleep(0) somewhere. + happens in your code, try adding a call to await giambio.sleep(0) somewhere. This will act as a checkpoint without actually pausing the execution of your function, but it will allow the scheduler to switch tasks @@ -57,6 +58,16 @@ async def sleep(seconds: int): await create_trap("sleep", seconds) +async def io_release(resource): + """ + Notifies the event loop to release + the resources associated to the given + socket and stop listening on it + """ + + await create_trap("io_release", resource) + + async def current_task(): """ Gets the currently running task in an asynchronous fashion diff --git a/tests/exceptions.py b/tests/exceptions.py index bd773a2..439e29d 100644 --- a/tests/exceptions.py +++ b/tests/exceptions.py @@ -30,4 +30,4 @@ async def main(): if __name__ == "__main__": - giambio.run(main, debugger=()) + giambio.run(main, debugger=Debugger()) diff --git a/tests/server.py b/tests/server.py index 4981959..849b17c 100644 --- a/tests/server.py +++ b/tests/server.py @@ -63,4 +63,5 @@ if __name__ == "__main__": if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") else: + raise logging.error(f"Exiting due to a {type(error).__name__}: {error}")