mirror of https://github.com/nocturn9x/giambio.git
Moved some code around, added more socket methods and fixed I/O for SSL sockets (hopefully)
This commit is contained in:
parent
268745c552
commit
95a9300da8
51
README.md
51
README.md
|
@ -10,21 +10,62 @@ production ready, so be aware that it is likely (if not guaranteed) that you'll
|
|||
|
||||
# Disclaimer
|
||||
|
||||
Right now this is nothing more than a toy implementation to help me understand how this whole `async`/`await` thing works
|
||||
and it is pretty much guaranteed to explode spectacularly badly while using it. If you find any bugs, please report them!
|
||||
|
||||
This project was hugely inspired by the [curio](https://github.com/dabeaz/curio) and the
|
||||
[trio](https://github.com/python-trio/trio) projects, you might want to have a look at their amazing work if you need a
|
||||
rock-solid and structured concurrency framework (I personally recommend trio and that's definitely not related to the fact
|
||||
that most of the content of this document is ~~stolen~~ inspired from its documentation)
|
||||
|
||||
|
||||
## Goals of this project
|
||||
|
||||
Making yet another async library might sound dumb in an already fragmented ecosystem like Python's.
|
||||
In fact, giambio was initially born as a fun toy project to help me understand how this whole async/await magic actually worked, but while I researched this topic
|
||||
further I found some issues with the current async ecosystem in Python.
|
||||
As of the time of writing, the ecosystem for async libraries is divided as follows:
|
||||
- Asyncio. Since it's in the stdlib, it sets a standard all of its own
|
||||
- Tornado/Gevent/other old frameworks (based partly on asyncio or not)
|
||||
- Modern, post PEP 492 frameworks like curio and trio
|
||||
|
||||
The main issue with asyncio is too complex to explain here in detail, but in short
|
||||
it boils down to the fact that it is an old library which was not designed to take advantage
|
||||
of the new async/await features natively and uses callbacks instead. There is a compatibility
|
||||
layer to use async/await, but that only causes more problems because it still runs on top of the legacy
|
||||
callback-based code (and it can't be used always, anyway). Asyncio has also a bunch
|
||||
of problems with exception propagation and cancellation, which is an issue shared
|
||||
by other old libraries like tornado and gevent.
|
||||
|
||||
To address this problem, libraries like trio and curio were born, implementing a new
|
||||
paradigm called _Structured Concurrency_, which makes the async model much easier to use and
|
||||
reason about. The two main players in this space are trio and curio.
|
||||
|
||||
Trio is an amazing library, probably the most advanced I've ever used, but for this exact
|
||||
reason it has 2 main issues:
|
||||
- The code is extremely intimidating to look at (without needing to be, read below)
|
||||
- It has a lot, and I mean a LOT, of layers of indirections and fancy features that are useful, but also slow down execution
|
||||
|
||||
|
||||
Curio has its own set of issues, namely:
|
||||
- It allows orphaned tasks (i.e. tasks not spawned trough a `curio.TaskPool`), so it partially breaks structured concurrency
|
||||
- It is not a community project, sadly
|
||||
- Big chunks of code are completely undocumented: curio's loop is basically a black box to external code (and that's a design choice)
|
||||
|
||||
What I did love about curio though, is that its code is understandable once you go down the "writing an async scheduler" rabbithole, and it is in
|
||||
fact my main source of ispiration for writing giambio as of now. Curio is also around 2 times faster than both trio and asyncio, according to benchmarks.
|
||||
|
||||
Giambio means to take the best of all of its predecessors, while being:
|
||||
- Fully documented and type hinted for 100% editor coverage
|
||||
- Small, but featureful
|
||||
- Fast, possibly as fast as curio, if not better
|
||||
- Dependency-free: No fancy C modules, no external libraries, just pure idiomatic Python code
|
||||
- Community-based: I frankly wouldn't have bothered making this if curio was open to community additions
|
||||
|
||||
## Current limitations
|
||||
|
||||
As I already mentioned, giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely:
|
||||
giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely:
|
||||
- Ensure cancellations work 100% of the time even when `await`ing functions and not spawning them
|
||||
- Extend I/O functionality
|
||||
- Add task synchronization primitives such as locks and semaphores (events *sort of* work now)
|
||||
- Add other task synchronization primitives such as locks and semaphores
|
||||
- Documentation
|
||||
|
||||
# What the hell is async anyway?
|
||||
|
||||
|
|
247
giambio/core.py
247
giambio/core.py
|
@ -34,18 +34,26 @@ from giambio.exceptions import (InternalError,
|
|||
TooSlowError
|
||||
)
|
||||
|
||||
# TODO: Right now this value is pretty much arbitrary, we need some testing to choose a sensible default
|
||||
IO_SKIP_LIMIT = 5
|
||||
|
||||
|
||||
class AsyncScheduler:
|
||||
"""
|
||||
A simple asynchronous scheduler implementation. Tries to mimic the threaded
|
||||
model in its simplicity, without using actual threads, but rather alternating
|
||||
A simple task scheduler implementation that tries to mimic thread programming
|
||||
in its simplicity, without using actual threads, but rather alternating
|
||||
across coroutines execution to let more than one thing at a time to proceed
|
||||
with its calculations. An attempt to fix the threaded model has been made
|
||||
without making the API unnecessarily complicated. A few examples are tasks
|
||||
cancellation and exception propagation.
|
||||
without making the API unnecessarily complicated.
|
||||
|
||||
This loop only provides the most basic support for task scheduling, I/O
|
||||
multiplexing, event delivery, task cancellation and exception propagation:
|
||||
any other feature should therefore be implemented in higher-level object
|
||||
wrappers (see socket.py and event.py for example). An object wrapper should
|
||||
not depend on the loop's implementation details such as internal state or
|
||||
directly access its methods: traps should be used instead; This is to
|
||||
ensure that the wrapper will keep working even if the scheduler giambio
|
||||
is using changes, which means it is entirely possible, and reasonable, to
|
||||
write your own event loop and run giambio on top of it, provided the required
|
||||
traps are correctly implemented.
|
||||
|
||||
:param clock: A callable returning monotonically increasing values at each call,
|
||||
defaults to timeit.default_timer
|
||||
|
@ -53,9 +61,18 @@ class AsyncScheduler:
|
|||
:param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output
|
||||
is desired, defaults to None
|
||||
:type debugger: :class: giambio.util.BaseDebugger
|
||||
:param selector: The selector to use for I/O multiplexing, defaults to selectors.DefaultSelector
|
||||
:param io_skip_limit: The max. amount of times I/O checks can be skipped when
|
||||
there are tasks to run. This makes sure that highly concurrent systems do not starve
|
||||
I/O waiting tasks. Defaults to 5
|
||||
:type io_skip_limit: int, optional
|
||||
:param io_max_timeout: The max. amount of seconds to pause for an I/O timeout.
|
||||
Keep in mind that this timeout is only valid if there are no deadlines happening before
|
||||
the timeout expires. Defaults to 86400 (1 day)
|
||||
:type io_max_timeout: int, optional
|
||||
"""
|
||||
|
||||
def __init__(self, clock: types.FunctionType = default_timer, debugger: BaseDebugger = None):
|
||||
def __init__(self, clock: types.FunctionType = default_timer, debugger: Optional[BaseDebugger] = None, selector: Optional[Any] = None, io_skip_limit: Optional[int] = None, io_max_timeout: Optional[int] = None):
|
||||
"""
|
||||
Object constructor
|
||||
"""
|
||||
|
@ -79,8 +96,6 @@ class AsyncScheduler:
|
|||
self.paused: TimeQueue = TimeQueue(self.clock)
|
||||
# All active Event objects
|
||||
self.events: Set[Event] = set()
|
||||
# Data to send back to a trap
|
||||
self.to_send: Optional[Any] = None
|
||||
# Have we ever ran?
|
||||
self.has_ran: bool = False
|
||||
# The current pool
|
||||
|
@ -91,6 +106,13 @@ class AsyncScheduler:
|
|||
self.io_skip: int = 0
|
||||
# A heap queue of deadlines to be checked
|
||||
self.deadlines: DeadlinesQueue = DeadlinesQueue()
|
||||
# Data to send back to a trap
|
||||
self._data: Optional[Any] = None
|
||||
# The I/O skip limit. TODO: Back up this value with euristics
|
||||
self.io_skip_limit = io_skip_limit
|
||||
# The max. I/O timeout
|
||||
self.io_max_timeout = io_max_timeout
|
||||
|
||||
|
||||
def done(self) -> bool:
|
||||
"""
|
||||
|
@ -111,10 +133,20 @@ class AsyncScheduler:
|
|||
|
||||
def run(self):
|
||||
"""
|
||||
Starts the loop and 'listens' for events until there is work to do,
|
||||
then exits. This behavior kinda resembles a kernel, as coroutines can
|
||||
request the loop's functionality only trough some fixed entry points,
|
||||
which in turn yield and give execution control to the loop itself.
|
||||
The event loop's runner function. This method drives
|
||||
execution for the entire framework and orchestrates I/O,
|
||||
events, sleeping, cancellations and deadlines, but the
|
||||
actual functionality for all of that is implemented in
|
||||
object wrappers (see socket.py or event.py for example).
|
||||
|
||||
This keeps the size of this module to a minimum while
|
||||
allowing anyone to replace it with their own, as long
|
||||
as the traps required by higher-level giambio objects
|
||||
are implemented. If you want to add features to the
|
||||
library, don't add them here, but take inspiration
|
||||
from the current object wrappers (i.e. not depending
|
||||
on any implementation detail from the loop other than
|
||||
traps)
|
||||
"""
|
||||
|
||||
while True:
|
||||
|
@ -126,9 +158,9 @@ class AsyncScheduler:
|
|||
self.close()
|
||||
break
|
||||
elif not self.tasks:
|
||||
# If there are no actively running tasks, we start by checking
|
||||
# for I/O. This method will wait for I/O until the closest deadline
|
||||
# to avoid starving sleeping tasks
|
||||
# If there are no actively running tasks, we start by
|
||||
# checking for I/O. This method will wait for I/O until
|
||||
# the closest deadline to avoid starving sleeping tasks
|
||||
if self.selector.get_map():
|
||||
self.check_io()
|
||||
if self.deadlines:
|
||||
|
@ -145,50 +177,11 @@ class AsyncScheduler:
|
|||
# The deadlines queue will internally make sure not to store
|
||||
# a deadline for the same pool twice. This makes the timeouts
|
||||
# model less flexible, because one can't change the timeout
|
||||
# after it is set, but it makes the implementation easier.
|
||||
# after it is set, but it makes the implementation easier
|
||||
self.deadlines.put(self.current_pool)
|
||||
# Otherwise, while there are tasks ready to run, we run them!
|
||||
while self.tasks:
|
||||
# Sets the currently running task
|
||||
self.current_task = self.tasks.pop(0)
|
||||
if self.current_task.done():
|
||||
# Since ensure_discard only checks for paused tasks,
|
||||
# we still need to make sure we don't try to execute
|
||||
# exited tasks that are on the running queue
|
||||
continue
|
||||
self.debugger.before_task_step(self.current_task)
|
||||
if self.current_task.cancel_pending:
|
||||
# We perform the deferred cancellation
|
||||
# if it was previously scheduled
|
||||
self.cancel(self.current_task)
|
||||
if self.to_send and self.current_task.status != "init":
|
||||
# A little setup to send objects from and to
|
||||
# coroutines outside the event loop
|
||||
data = self.to_send
|
||||
else:
|
||||
# The first time coroutines' method .send() wants None!
|
||||
data = None
|
||||
# Run a single step with the calculation (i.e. until a yield
|
||||
# somewhere)
|
||||
method, *args = self.current_task.run(data)
|
||||
# Some debugging and internal chatter here
|
||||
self.current_task.status = "run"
|
||||
self.current_task.steps += 1
|
||||
self.debugger.after_task_step(self.current_task)
|
||||
# If data has been sent, reset it to None
|
||||
if self.to_send and self.current_task != "init":
|
||||
self.to_send = None
|
||||
# Sneaky method call, thanks to David Beazley for this ;)
|
||||
if not hasattr(self, method):
|
||||
# If this happens, that's quite bad!
|
||||
# This if block is meant to be triggered by other async
|
||||
# libraries, which most likely have different trap names and behaviors
|
||||
# compared to us. If you get this exception and you're 100% sure you're
|
||||
# not mixing async primitives from other libraries, then it's a bug!
|
||||
raise InternalError("Uh oh! Something very bad just happened, did"
|
||||
" you try to mix primitives from other async libraries?") from None
|
||||
|
||||
getattr(self, method)(*args)
|
||||
self.run_task_step()
|
||||
except StopIteration as ret:
|
||||
# At the end of the day, coroutines are generator functions with
|
||||
# some tricky behaviors, and this is one of them. When a coroutine
|
||||
|
@ -201,6 +194,7 @@ class AsyncScheduler:
|
|||
self.current_task.result = ret.value
|
||||
self.current_task.finished = True
|
||||
self.debugger.on_task_exit(self.current_task)
|
||||
self.io_release_task(self.current_task)
|
||||
self.join(self.current_task)
|
||||
except BaseException as err:
|
||||
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
|
||||
|
@ -210,8 +204,79 @@ class AsyncScheduler:
|
|||
self.current_task.exc.__traceback__ = self.current_task.exc.__traceback__.tb_next
|
||||
self.current_task.status = "crashed"
|
||||
self.debugger.on_exception_raised(self.current_task, err)
|
||||
self.io_release_task(self.current_task)
|
||||
self.join(self.current_task)
|
||||
self.ensure_discard(self.current_task)
|
||||
|
||||
def run_task_step(self):
|
||||
"""
|
||||
Runs a single step for the current task.
|
||||
A step ends when the task awaits any of
|
||||
giambio's primitives or async methods.
|
||||
|
||||
Note that this method does NOT catch any
|
||||
exception arising from tasks, nor does it
|
||||
take StopIteration or CancelledError into
|
||||
account, that's self.run's job!
|
||||
"""
|
||||
|
||||
# Sets the currently running task
|
||||
self.current_task = self.tasks.pop(0)
|
||||
if self.current_task.done():
|
||||
# We need to make sure we don't try to execute
|
||||
# exited tasks that are on the running queue
|
||||
return
|
||||
self.debugger.before_task_step(self.current_task)
|
||||
if self.current_task.cancel_pending:
|
||||
# We perform the deferred cancellation
|
||||
# if it was previously scheduled
|
||||
self.cancel(self.current_task)
|
||||
# Little boilerplate to send data back to an async trap
|
||||
data = None
|
||||
if self.current_task.status != "init":
|
||||
data = self._data
|
||||
# Run a single step with the calculation (i.e. until a yield
|
||||
# somewhere)
|
||||
method, *args = self.current_task.run(data)
|
||||
self._data = None
|
||||
# Some debugging and internal chatter here
|
||||
self.current_task.status = "run"
|
||||
self.current_task.steps += 1
|
||||
self.debugger.after_task_step(self.current_task)
|
||||
if not hasattr(self, method):
|
||||
# If this happens, that's quite bad!
|
||||
# This if block is meant to be triggered by other async
|
||||
# libraries, which most likely have different trap names and behaviors
|
||||
# compared to us. If you get this exception and you're 100% sure you're
|
||||
# not mixing async primitives from other libraries, then it's a bug!
|
||||
raise InternalError("Uh oh! Something very bad just happened, did"
|
||||
" you try to mix primitives from other async libraries?") from None
|
||||
|
||||
# Sneaky method call, thanks to David Beazley for this ;)
|
||||
getattr(self, method)(*args)
|
||||
|
||||
|
||||
def io_release_task(self, task: Task):
|
||||
"""
|
||||
Calls self.io_release in a loop
|
||||
for each I/O resource the given task owns
|
||||
"""
|
||||
|
||||
if self.selector.get_map():
|
||||
for k in filter(lambda o: o.data == self.current_task,
|
||||
dict(self.selector.get_map()).values()):
|
||||
self.io_release(k.fileobj)
|
||||
task.last_io = ()
|
||||
|
||||
def io_release(self, sock):
|
||||
"""
|
||||
Releases the given resource from our
|
||||
selector.
|
||||
|
||||
:param sock: The resource to be released
|
||||
"""
|
||||
|
||||
if self.selector.get_map() and sock in self.selector.get_map():
|
||||
self.selector.unregister(sock)
|
||||
|
||||
def do_cancel(self, task: Task):
|
||||
"""
|
||||
|
@ -223,15 +288,17 @@ class AsyncScheduler:
|
|||
"""
|
||||
|
||||
self.debugger.before_cancel(task)
|
||||
task.throw(CancelledError())
|
||||
error = CancelledError()
|
||||
error.task = task
|
||||
task.throw(error)
|
||||
|
||||
def get_current(self):
|
||||
def get_current(self) -> Task:
|
||||
"""
|
||||
Returns the current task to an async caller
|
||||
'Returns' the current task to an async caller
|
||||
"""
|
||||
|
||||
self._data = self.current_task
|
||||
self.tasks.append(self.current_task)
|
||||
self.to_send = self.current_task
|
||||
|
||||
def expire_deadlines(self):
|
||||
"""
|
||||
|
@ -272,7 +339,6 @@ class AsyncScheduler:
|
|||
# Reschedules tasks when their deadline has elapsed
|
||||
task = self.paused.get()
|
||||
slept = self.clock() - task.sleep_start
|
||||
task.sleep_start = 0.0
|
||||
self.tasks.append(task)
|
||||
self.debugger.after_sleep(task, slept)
|
||||
|
||||
|
@ -305,15 +371,15 @@ class AsyncScheduler:
|
|||
|
||||
before_time = self.clock() # Used for the debugger
|
||||
if self.tasks or self.events:
|
||||
# If there is work to do immediately we prefer to
|
||||
# If there is work to do immediately (tasks to run) we prefer to
|
||||
# do that first unless some conditions are met, see below
|
||||
self.io_skip += 1
|
||||
if self.io_skip == IO_SKIP_LIMIT:
|
||||
if self.io_skip == self.io_skip_limit:
|
||||
# We can't skip every time there's some task ready
|
||||
# or else we might starve I/O waiting tasks
|
||||
# when a lot of things are running at the same time
|
||||
# or else we might starve I/O waiting tasks when a
|
||||
# lot of things are running at the same time
|
||||
self.io_skip = 0
|
||||
timeout = 86400
|
||||
timeout = self.io_max_timeout
|
||||
else:
|
||||
# If there are either tasks or events and no I/O, don't wait
|
||||
# (unless we already skipped this check too many times)
|
||||
|
@ -323,7 +389,7 @@ class AsyncScheduler:
|
|||
timeout = self.get_closest_deadline()
|
||||
else:
|
||||
# If there is *only* I/O, we wait a fixed amount of time
|
||||
timeout = 86400 # Stolen from trio :D
|
||||
timeout = self.io_max_timeout
|
||||
self.debugger.before_io(timeout)
|
||||
io_ready = self.selector.select(timeout)
|
||||
# Get sockets that are ready and schedule their tasks
|
||||
|
@ -405,27 +471,6 @@ class AsyncScheduler:
|
|||
self.get_io_tasks(),
|
||||
[self.current_task])
|
||||
|
||||
def ensure_discard(self, task: Task):
|
||||
"""
|
||||
Ensures that tasks that need to be cancelled are not
|
||||
rescheduled further. This method exists because tasks might
|
||||
be cancelled in a context where it's not obvious which Task
|
||||
object must be discarded and not rescheduled at the next iteration
|
||||
"""
|
||||
|
||||
# TODO: Do we need else ifs or ifs? The question arises because
|
||||
# tasks might be doing I/O and other stuff too even if not at the same time
|
||||
if task in self.paused:
|
||||
self.paused.discard(task)
|
||||
if self.selector.get_map():
|
||||
for key in dict(self.selector.get_map()).values():
|
||||
if key.data == task:
|
||||
self.selector.unregister(key.fileobj)
|
||||
if task in self.get_event_tasks():
|
||||
for evt in self.events:
|
||||
if task in evt.waiters:
|
||||
evt.waiters.remove(task)
|
||||
|
||||
def cancel_all(self) -> bool:
|
||||
"""
|
||||
Cancels ALL tasks as returned by self.get_all_tasks() and returns
|
||||
|
@ -538,19 +583,25 @@ class AsyncScheduler:
|
|||
or does so straight away if it is safe to do so
|
||||
"""
|
||||
|
||||
if task.done():
|
||||
self.ensure_discard(task)
|
||||
if task.done() or task.status == "init":
|
||||
# The task isn't running already!
|
||||
task.cancel_pending = False
|
||||
return
|
||||
elif task.status in ("io", "sleep", "init"):
|
||||
elif task.status in ("io", "sleep"):
|
||||
# We cancel immediately only in a context where it's safer to do
|
||||
# so. The concept of "safer" is quite tricky, because even though the
|
||||
# task is technically not running, it might leave some unfinished state
|
||||
# or dangling resource open after being cancelled, so maybe we need
|
||||
# a different approach altogether
|
||||
if task.status == "io":
|
||||
for k in filter(lambda o: o.data == task,
|
||||
dict(self.selector.get_map()).values()):
|
||||
self.selector.unregister(k.fileobj)
|
||||
elif task.status == "sleep":
|
||||
self.paused.discard(task)
|
||||
try:
|
||||
self.do_cancel(task)
|
||||
except CancelledError:
|
||||
except CancelledError as cancel:
|
||||
# When a task needs to be cancelled, giambio tries to do it gracefully
|
||||
# first: if the task is paused in either I/O or sleeping, that's perfect.
|
||||
# But we also need to cancel a task if it was not sleeping or waiting on
|
||||
|
@ -560,11 +611,12 @@ class AsyncScheduler:
|
|||
# to re-raise cancellations at every checkpoint until the task lets the
|
||||
# exception propagate into us, because we *really* want the task to be
|
||||
# cancelled
|
||||
task.status = "cancelled"
|
||||
task.cancelled = True
|
||||
task = cancel.task
|
||||
task.cancel_pending = False
|
||||
task.cancelled = True
|
||||
task.status = "cancelled"
|
||||
self.io_release_task(self.current_task)
|
||||
self.debugger.after_cancel(task)
|
||||
self.ensure_discard(task)
|
||||
else:
|
||||
# If we can't cancel in a somewhat "graceful" way, we just
|
||||
# defer this operation for later (check run() for more info)
|
||||
|
@ -606,7 +658,6 @@ class AsyncScheduler:
|
|||
|
||||
:param sock: The socket on which a read or write operation
|
||||
has to be performed
|
||||
:type sock: socket.socket
|
||||
:param evt_type: The type of event to perform on the given
|
||||
socket, either "read" or "write"
|
||||
:type evt_type: str
|
||||
|
|
|
@ -51,8 +51,10 @@ class Task:
|
|||
# time by the event loop, internally. Possible values for this are "init"--
|
||||
# when the task has been created but not started running yet--, "run"-- when
|
||||
# the task is running synchronous code--, "io"-- when the task is waiting on
|
||||
# an I/O resource--, "sleep"-- when the task is either asleep or waiting on an event
|
||||
# and "crashed"-- when the task has exited because of an exception
|
||||
# an I/O resource--, "sleep"-- when the task is either asleep or waiting on
|
||||
# an event, "crashed"-- when the task has exited because of an exception
|
||||
# and "cancelled" when-- when the task has been explicitly cancelled with
|
||||
# its cancel() method or as a result of an exception
|
||||
status: str = "init"
|
||||
# This attribute counts how many times the task's run() method has been called
|
||||
steps: int = 0
|
||||
|
@ -130,6 +132,17 @@ class Task:
|
|||
|
||||
return self.exc or self.finished or self.cancelled
|
||||
|
||||
def __del__(self):
|
||||
"""
|
||||
Task destructor
|
||||
"""
|
||||
|
||||
try:
|
||||
self.coroutine.close()
|
||||
except RuntimeError:
|
||||
pass # TODO: This is kinda bad
|
||||
assert not self.last_io
|
||||
|
||||
|
||||
class Event:
|
||||
"""
|
||||
|
@ -143,7 +156,6 @@ class Event:
|
|||
|
||||
self.set = False
|
||||
self.waiters = []
|
||||
self.event_caught = False
|
||||
|
||||
async def trigger(self):
|
||||
"""
|
||||
|
|
|
@ -16,13 +16,18 @@ limitations under the License.
|
|||
"""
|
||||
|
||||
import ssl
|
||||
from socket import SOL_SOCKET, SO_ERROR
|
||||
import socket as builtin_socket
|
||||
from giambio.run import get_event_loop
|
||||
from giambio.exceptions import ResourceClosed
|
||||
from giambio.traps import want_write, want_read
|
||||
from ssl import SSLWantReadError, SSLWantWriteError
|
||||
|
||||
IOInterrupt = (BlockingIOError, InterruptedError, SSLWantReadError, SSLWantWriteError)
|
||||
try:
|
||||
from ssl import SSLWantReadError, SSLWantWriteError
|
||||
WantRead = (BlockingIOError, InterruptedError, SSLWantReadError)
|
||||
WantWrite = (BlockingIOError, InterruptedError, SSLWantWriteError)
|
||||
except ImportError:
|
||||
WantRead = (BlockingIOError, InterruptedError)
|
||||
WantWrite = (BlockingIOError, InterruptedError)
|
||||
|
||||
|
||||
class AsyncSocket:
|
||||
|
@ -32,61 +37,55 @@ class AsyncSocket:
|
|||
|
||||
def __init__(self, sock):
|
||||
self.sock = sock
|
||||
self.loop = get_event_loop()
|
||||
self._closed = False
|
||||
self._fd = sock.fileno()
|
||||
self.sock.setblocking(False)
|
||||
|
||||
async def receive(self, max_size: int):
|
||||
|
||||
async def receive(self, max_size: int, flags: int = 0) -> bytes:
|
||||
"""
|
||||
Receives up to max_size bytes from a socket asynchronously
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
assert max_size >= 1, "max_size must be >= 1"
|
||||
if isinstance(self.sock, ssl.SSLSocket) and self.sock.pending():
|
||||
print(self.sock.pending())
|
||||
return self.sock.recv(self.sock.pending())
|
||||
await want_read(self.sock)
|
||||
data = b""
|
||||
if self._fd == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
while True:
|
||||
try:
|
||||
return self.sock.recv(max_size)
|
||||
except IOInterrupt:
|
||||
return self.sock.recv(max_size, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
return self.sock.recv(max_size)
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
|
||||
async def accept(self):
|
||||
"""
|
||||
Accepts the socket, completing the 3-step TCP handshake asynchronously
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
await want_read(self.sock)
|
||||
while True:
|
||||
try:
|
||||
to_wrap = self.sock.accept()
|
||||
except IOInterrupt:
|
||||
# Some platforms (namely OSX systems) act weird and handle
|
||||
# the errno 35 signal (EAGAIN) for sockets in a weird manner,
|
||||
# and this seems to fix the issue. Not sure about why since we
|
||||
# already called want_read above, but it ain't stupid if it works I guess
|
||||
remote, addr = self.sock.accept()
|
||||
return wrap_socket(remote), addr
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
to_wrap = self.sock.accept()
|
||||
return wrap_socket(to_wrap[0]), to_wrap[1]
|
||||
|
||||
async def send_all(self, data: bytes):
|
||||
async def send_all(self, data: bytes, flags: int = 0):
|
||||
"""
|
||||
Sends all data inside the buffer asynchronously until it is empty
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
while data:
|
||||
await want_write(self.sock)
|
||||
try:
|
||||
sent_no = self.sock.send(data)
|
||||
except IOInterrupt:
|
||||
sent_no = self.sock.send(data, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
sent_no = self.sock.send(data)
|
||||
data = data[sent_no:]
|
||||
|
||||
async def close(self):
|
||||
|
@ -94,30 +93,26 @@ class AsyncSocket:
|
|||
Closes the socket asynchronously
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
await want_write(self.sock)
|
||||
try:
|
||||
await release_sock(self.sock)
|
||||
self.sock.close()
|
||||
except IOInterrupt:
|
||||
await want_write(self.sock)
|
||||
self.sock.close()
|
||||
self.loop.selector.unregister(self.sock)
|
||||
self.loop.current_task.last_io = ()
|
||||
self._closed = True
|
||||
self._sock = None
|
||||
self.sock = -1
|
||||
|
||||
async def connect(self, addr: tuple):
|
||||
"""
|
||||
Connects the socket to an endpoint
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
await want_write(self.sock)
|
||||
try:
|
||||
self.sock.connect(addr)
|
||||
except IOInterrupt as io_interrupt:
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
self.sock.connect(addr)
|
||||
|
||||
|
||||
async def bind(self, addr: tuple):
|
||||
"""
|
||||
|
@ -127,7 +122,7 @@ class AsyncSocket:
|
|||
:type addr: tuple
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
self.sock.bind(addr)
|
||||
|
||||
|
@ -139,29 +134,88 @@ class AsyncSocket:
|
|||
:type backlog: int
|
||||
"""
|
||||
|
||||
if self._closed:
|
||||
if self.sock == -1:
|
||||
raise ResourceClosed("I/O operation on closed socket")
|
||||
self.sock.listen(backlog)
|
||||
|
||||
def __del__(self):
|
||||
"""
|
||||
Implements the destructor for the async socket,
|
||||
notifying the event loop that the socket must not
|
||||
be listened for anymore. This avoids the loop
|
||||
blocking forever on trying to read from a socket
|
||||
that's gone out of scope without being closed
|
||||
"""
|
||||
|
||||
if not self._closed and self.loop.selector.get_map() and self.sock in self.loop.selector.get_map():
|
||||
self.loop.selector.unregister(self.sock)
|
||||
self.loop.current_task.last_io = ()
|
||||
self._closed = True
|
||||
|
||||
async def __aenter__(self):
|
||||
self.sock.__enter__()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *_):
|
||||
await self.close()
|
||||
async def __aexit__(self, *args):
|
||||
if self.sock:
|
||||
self.sock.__exit__(*args)
|
||||
|
||||
# Yes, I stole these from Curio because I could not be
|
||||
# arsed to write a bunch of uninteresting simple socket
|
||||
# methods from scratch, deal with it.
|
||||
|
||||
async def connect(self, address):
|
||||
try:
|
||||
result = self.sock.connect(address)
|
||||
if getattr(self, 'do_handshake_on_connect', False):
|
||||
await self.do_handshake()
|
||||
return result
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
err = self.sock.getsockopt(SOL_SOCKET, SO_ERROR)
|
||||
if err != 0:
|
||||
raise OSError(err, f'Connect call failed {address}')
|
||||
if getattr(self, 'do_handshake_on_connect', False):
|
||||
await self.do_handshake()
|
||||
|
||||
async def recvfrom(self, buffersize, flags=0):
|
||||
while True:
|
||||
try:
|
||||
return self.sock.recvfrom(buffersize, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
|
||||
async def recvfrom_into(self, buffer, bytes=0, flags=0):
|
||||
while True:
|
||||
try:
|
||||
return self.sock.recvfrom_into(buffer, bytes, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
|
||||
async def sendto(self, bytes, flags_or_address, address=None):
|
||||
if address:
|
||||
flags = flags_or_address
|
||||
else:
|
||||
address = flags_or_address
|
||||
flags = 0
|
||||
while True:
|
||||
try:
|
||||
return self.sock.sendto(bytes, flags, address)
|
||||
except WantWrite:
|
||||
await want_write(self.sock)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
|
||||
async def recvmsg(self, bufsize, ancbufsize=0, flags=0):
|
||||
while True:
|
||||
try:
|
||||
return self.sock.recvmsg(bufsize, ancbufsize, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
|
||||
async def recvmsg_into(self, buffers, ancbufsize=0, flags=0):
|
||||
while True:
|
||||
try:
|
||||
return self.sock.recvmsg_into(buffers, ancbufsize, flags)
|
||||
except WantRead:
|
||||
await want_read(self.sock)
|
||||
|
||||
async def sendmsg(self, buffers, ancdata=(), flags=0, address=None):
|
||||
while True:
|
||||
try:
|
||||
return self.sock.sendmsg(buffers, ancdata, flags, address)
|
||||
except WantRead:
|
||||
await want_write(self.sock)
|
||||
|
||||
def __repr__(self):
|
||||
return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})"
|
||||
|
|
|
@ -38,14 +38,15 @@ def create_trap(method, *args):
|
|||
|
||||
async def sleep(seconds: int):
|
||||
"""
|
||||
Pause the execution of an async function for a given amount of seconds,
|
||||
without blocking the entire event loop, which keeps watching for other events.
|
||||
Pause the execution of an async function for a given amount of seconds.
|
||||
This function is functionally equivalent to time.sleep, but can be used
|
||||
within async code without blocking everything else.
|
||||
|
||||
This function is also useful as a sort of checkpoint, because it returns
|
||||
control to the scheduler, which can then switch to another task. If your code
|
||||
doesn't have enough calls to async functions (or 'checkpoints') this might
|
||||
prevent the scheduler from switching tasks properly. If you feel like this
|
||||
happens in your code, try adding a call to giambio.sleep(0) somewhere.
|
||||
happens in your code, try adding a call to await giambio.sleep(0) somewhere.
|
||||
This will act as a checkpoint without actually pausing the execution
|
||||
of your function, but it will allow the scheduler to switch tasks
|
||||
|
||||
|
@ -57,6 +58,16 @@ async def sleep(seconds: int):
|
|||
await create_trap("sleep", seconds)
|
||||
|
||||
|
||||
async def io_release(resource):
|
||||
"""
|
||||
Notifies the event loop to release
|
||||
the resources associated to the given
|
||||
socket and stop listening on it
|
||||
"""
|
||||
|
||||
await create_trap("io_release", resource)
|
||||
|
||||
|
||||
async def current_task():
|
||||
"""
|
||||
Gets the currently running task in an asynchronous fashion
|
||||
|
|
|
@ -30,4 +30,4 @@ async def main():
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
giambio.run(main, debugger=())
|
||||
giambio.run(main, debugger=Debugger())
|
||||
|
|
|
@ -63,4 +63,5 @@ if __name__ == "__main__":
|
|||
if isinstance(error, KeyboardInterrupt):
|
||||
logging.info("Ctrl+C detected, exiting")
|
||||
else:
|
||||
raise
|
||||
logging.error(f"Exiting due to a {type(error).__name__}: {error}")
|
||||
|
|
Loading…
Reference in New Issue