Moved socket functionality out of the loop and added some more functions to the socket module (updating examples)

This commit is contained in:
nocturn9x 2021-04-23 09:17:55 +02:00
parent 79cbba994c
commit f55826d534
9 changed files with 132 additions and 142 deletions

View File

@ -19,6 +19,13 @@ rock-solid and structured concurrency framework (I personally recommend trio and
that most of the content of this document is ~~stolen~~ inspired from its documentation)
## Current limitations
As I already mentioned, giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely:
- Ensure cancellations work 100% of the time even when `await`ing functions and not spawning them
- Extend I/O functionality
- Add task synchronization primitives such as locks and semaphores (events *sort of* work now)
# What the hell is async anyway?
Libraries like giambio shine the most when it comes to performing asynchronous I/O (reading from a socket, writing to a file, that sort of thing).
@ -539,29 +546,26 @@ clients and dispatch them to some other handler.
```python
import giambio
import socket
import logging
async def serve(bind_address: tuple):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(bind_address)
sock.listen(5)
async_sock = giambio.wrap_socket(sock) # We make the socket an async socket
sock = giambio.socket.socket()
await sock.bind(bind_address)
await sock.listen(5)
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
while True:
conn, address_tuple = await async_sock.accept()
conn, address_tuple = await sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
pool.spawn(handler, conn, address_tuple)
```
So, our `serve` function does a few things:
- Sets up our server socket, just like in a synchronous server (notice how we bind and listen **before** wrapping it)
- Uses giambio's `wrap_socket` function to wrap the plain old synchronous socket into an async one
- Sets up our server socket, just like in a synchronous server
- Opens a task pool and starts listening for clients in loop by using our new `giambio.socket.AsyncSocket` object
- Notice how we use `await async_sock.accept()` and not `sock.accept()`, because that could block the loop
- Notice how we use `await sock.accept()` and not `sock.accept()`, because that is an asynchronous socket!
- Once a client connects, we log some information, spawn a new task and pass it the client socket: that is our client handler
So, let's go over the declaration of `handler` then:

View File

@ -21,7 +21,6 @@ __version__ = (0, 0, 1)
from . import exceptions, socket, context, core
from .socket import wrap_socket
from .traps import sleep, current_task
from .objects import Event
from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout
@ -36,11 +35,10 @@ __all__ = [
"Event",
"run",
"clock",
"wrap_socket",
"create_pool",
"with_timeout",
"get_event_loop",
"current_task",
"new_event_loop",
"debug"
"debug",
]

View File

@ -23,22 +23,19 @@ from typing import List
class TaskManager:
"""
An asynchronous context manager for giambio
An asynchronous context manager for giambio, similar to trio's nurseries
:param loop: The event loop bound to this pool. Most of the times
it's the return value from giambio.get_event_loop()
:type loop: :class: AsyncScheduler
:param timeout: The pool's timeout length in seconds, if any, defaults to None
:type timeout: float, optional
"""
def __init__(self, loop: "giambio.core.AsyncScheduler", timeout: float = None) -> None:
def __init__(self, timeout: float = None) -> None:
"""
Object constructor
"""
# The event loop associated with this pool
self.loop: "giambio.core.AsyncScheduler" = loop
self.loop: giambio.core.AsyncScheduler = giambio.get_event_loop()
# All the tasks that belong to this pool
self.tasks: List[giambio.objects.Task] = []
# Whether we have been cancelled or not
@ -85,7 +82,6 @@ class TaskManager:
async def __aenter__(self):
"""
Implements the asynchronous context manager interface,
marking the pool as started and returning itself
"""
return self

View File

@ -142,6 +142,13 @@ class AsyncScheduler:
# Then we try to awake event-waiting tasks
if self.events:
self.check_events()
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific).
# The deadlines queue will internally make sure not to store
# a deadline for the same pool twice. This makes the timeouts
# model less flexible, because one can't change the timeout
# after it is set, but it makes the implementation easier.
self.deadlines.put(self.current_pool)
# Otherwise, while there are tasks ready to run, we run them!
while self.tasks:
# Sets the currently running task
@ -151,13 +158,6 @@ class AsyncScheduler:
# we still need to make sure we don't try to execute
# exited tasks that are on the running queue
continue
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific).
# The deadlines queue will internally make sure not to store
# a deadline for the same pool twice. This makes the timeouts
# model less flexible, because one can't change the timeout
# after it is set, but it makes the implementation easier.
self.deadlines.put(self.current_pool)
self.debugger.before_task_step(self.current_task)
if self.current_task.cancel_pending:
# We perform the deferred cancellation
@ -244,12 +244,8 @@ class AsyncScheduler:
while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock():
pool = self.deadlines.get()
pool.timed_out = True
self.cancel_pool(pool)
# Since we already know that exceptions will behave correctly
# (heck, half of the code in here only serves that purpose)
# all we do here is just raise an exception as if the current
# task raised it and let our machinery deal with the rest
raise TooSlowError()
if not self.current_task.done():
self.current_task.throw(TooSlowError())
def check_events(self):
"""
@ -393,8 +389,9 @@ class AsyncScheduler:
Yields all tasks currently waiting on I/O resources
"""
for k in self.selector.get_map().values():
yield k.data
if self.selector.get_map():
for k in self.selector.get_map().values():
yield k.data
def get_all_tasks(self) -> chain:
"""
@ -407,7 +404,8 @@ class AsyncScheduler:
return chain(self.tasks,
self.get_asleep_tasks(),
self.get_event_tasks(),
self.get_io_tasks())
self.get_io_tasks(),
[self.current_task])
def ensure_discard(self, task: Task):
"""
@ -472,12 +470,13 @@ class AsyncScheduler:
# occur
self.tasks.append(t)
# noinspection PyMethodMayBeStatic
def is_pool_done(self, pool: TaskManager) -> bool:
"""
Returns true if the given pool has finished
running and can be safely terminated
:return: Whether the pool and any enclosing pools finished running
:return: Whether the pool finished running
:rtype: bool
"""
@ -647,79 +646,6 @@ class AsyncScheduler:
# The socket is already registered doing something else
raise ResourceBusy("The given socket is being read/written by another task") from None
# noinspection PyMethodMayBeStatic
async def read_sock(self, sock: socket.socket, buffer: int):
"""
Reads from a socket asynchronously, waiting until the resource is
available and returning up to buffer bytes from the socket
:param sock: The socket that must be read
:type sock: socket.socket
:param buffer: The maximum amount of bytes that will be read
:type buffer: int
"""
await want_read(sock)
return sock.recv(buffer)
# noinspection PyMethodMayBeStatic
async def accept_sock(self, sock: socket.socket):
"""
Accepts a socket connection asynchronously, waiting until the resource
is available and returning the result of the sock.accept() call
:param sock: The socket that must be accepted
:type sock: socket.socket
"""
await want_read(sock)
try:
return sock.accept()
except BlockingIOError:
# Some platforms (namely OSX systems) act weird and handle
# the errno 35 signal (EAGAIN) for sockets in a weird manner,
# and this seems to fix the issue. Not sure about why since we
# already called want_read above, but it ain't stupid if it works I guess
await want_read(sock)
return sock.accept()
# noinspection PyMethodMayBeStatic
async def sock_sendall(self, sock: socket.socket, data: bytes):
"""
Sends all the passed bytes trough the given socket, asynchronously
:param sock: The socket that must be written
:type sock: socket.socket
:param data: The bytes to send across the socket
:type data: bytes
"""
while data:
await want_write(sock)
try:
sent_no = sock.send(data)
except BlockingIOError:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:]
async def close_sock(self, sock: socket.socket):
"""
Closes the given socket asynchronously
:param sock: The socket that must be closed
:type sock: socket.socket
"""
await want_write(sock)
try:
sock.close()
except BlockingIOError:
await want_write(sock)
sock.close()
self.selector.unregister(sock)
self.current_task.last_io = ()
# noinspection PyMethodMayBeStatic
async def connect_sock(self, sock: socket.socket, address_tuple: tuple):
"""

View File

@ -203,8 +203,8 @@ class TimeQueue:
or -1 if it is not present
"""
for i in self.container:
if i[2] == item:
for i, e in enumerate(self.container):
if e[2] == item:
return i
return -1
@ -222,8 +222,8 @@ class TimeQueue:
"""
idx = self.index(item)
if idx != 1:
self.container.remove(idx)
if idx != -1:
self.container.pop(idx)
heapify(self.container)
def get_closest_deadline(self) -> float:
@ -333,8 +333,8 @@ class DeadlinesQueue:
or -1 if it is not present
"""
for i in self.container:
if i[2] == item:
for i, e in enumerate(self.container):
if e[2] == item:
return i
return -1
@ -353,7 +353,7 @@ class DeadlinesQueue:
idx = self.index(item)
if idx != 1:
self.container.remove(idx)
self.container.pop(idx)
heapify(self.container)
def get_closest_deadline(self) -> float:

View File

@ -91,7 +91,7 @@ def create_pool():
"""
loop = get_event_loop()
pool = TaskManager(loop)
pool = TaskManager()
loop.current_pool = pool
return pool
@ -102,6 +102,9 @@ def with_timeout(timeout: int or float):
"""
loop = get_event_loop()
pool = TaskManager(loop, timeout)
# We add 1 to make the timeout intuitive and inclusive (i.e.
# a 10 seconds timeout means the task is allowed to run 10
# whole seconds instead of cancelling at the tenth second)
pool = TaskManager(timeout + 1)
loop.current_pool = pool
return pool

View File

@ -15,9 +15,10 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
import socket as builtin_socket
from giambio.run import get_event_loop
from giambio.exceptions import ResourceClosed
from giambio.traps import want_write, want_read
class AsyncSocket:
@ -25,7 +26,7 @@ class AsyncSocket:
Abstraction layer for asynchronous sockets
"""
def __init__(self, sock: socket.socket):
def __init__(self, sock: builtin_socket.socket):
self.sock = sock
self.loop = get_event_loop()
self._closed = False
@ -38,7 +39,13 @@ class AsyncSocket:
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
return await self.loop.read_sock(self.sock, max_size)
assert max_size >= 1, "max_size must be >= 1"
await want_read(self.sock)
try:
return self.sock.recv(max_size)
except BlockingIOError:
await want_read(self.sock)
return self.sock.recv(max_size)
async def accept(self):
"""
@ -47,7 +54,16 @@ class AsyncSocket:
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
to_wrap = await self.loop.accept_sock(self.sock)
await want_read(self.sock)
try:
to_wrap = self.sock.accept()
except BlockingIOError:
# Some platforms (namely OSX systems) act weird and handle
# the errno 35 signal (EAGAIN) for sockets in a weird manner,
# and this seems to fix the issue. Not sure about why since we
# already called want_read above, but it ain't stupid if it works I guess
await want_read(self.sock)
to_wrap = self.sock.accept()
return wrap_socket(to_wrap[0]), to_wrap[1]
async def send_all(self, data: bytes):
@ -57,7 +73,14 @@ class AsyncSocket:
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
return await self.loop.sock_sendall(self.sock, data)
while data:
await want_write(self.sock)
try:
sent_no = self.sock.send(data)
except BlockingIOError:
await want_write(self.sock)
sent_no = self.sock.send(data)
data = data[sent_no:]
async def close(self):
"""
@ -66,7 +89,14 @@ class AsyncSocket:
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
await self.loop.close_sock(self.sock)
await want_write(self.sock)
try:
self.sock.close()
except BlockingIOError:
await want_write(self.sock)
self.sock.close()
self.loop.selector.unregister(self.sock)
self.loop.current_task.last_io = ()
self._closed = True
async def connect(self, addr: tuple):
@ -76,7 +106,36 @@ class AsyncSocket:
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
await self.loop.connect_sock(self.sock, addr)
await want_write(self.sock)
try:
self.sock.connect(addr)
except BlockingIOError:
await want_write(self.sock)
self.sock.connect(addr)
async def bind(self, addr: tuple):
"""
Binds the socket to an address
:param addr: The address, port tuple to bind to
:type addr: tuple
"""
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
self.sock.bind(addr)
async def listen(self, backlog: int):
"""
Starts listening with the given backlog
:param backlog: The address, port tuple to bind to
:type backlog: int
"""
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
self.sock.listen(backlog)
def __del__(self):
"""
@ -100,9 +159,20 @@ class AsyncSocket:
return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})"
def wrap_socket(sock: socket.socket) -> AsyncSocket:
def wrap_socket(sock: builtin_socket.socket) -> AsyncSocket:
"""
Wraps a standard socket into an async socket
"""
return AsyncSocket(sock)
def socket(*args, **kwargs):
"""
Creates a new giambio socket, taking in the same positional and
keyword arguments as the standard library's socket.socket
constructor
"""
return AsyncSocket(builtin_socket.socket(*args, **kwargs))

View File

@ -1,7 +1,5 @@
import giambio
from giambio.socket import AsyncSocket
from debugger import Debugger
import socket
import logging
import sys
@ -16,14 +14,13 @@ async def serve(bind_address: tuple):
(address, port) where address is a string and port is an integer
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(bind_address)
sock.listen(5)
async_sock = giambio.wrap_socket(sock) # We make the socket an async socket
sock = giambio.socket.socket()
await sock.bind(bind_address)
await sock.listen(5)
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
while True:
conn, address_tuple = await async_sock.accept()
conn, address_tuple = await sock.accept()
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
pool.spawn(handler, conn, address_tuple)
@ -61,7 +58,7 @@ if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501
logging.basicConfig(level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p")
try:
giambio.run(serve, ("localhost", port), debugger=None)
giambio.run(serve, ("localhost", port))
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting")

View File

@ -11,17 +11,13 @@ async def child(name: int):
async def main():
start = giambio.clock()
try:
async with giambio.with_timeout(6) as pool:
# TODO: We need to consider the inner part of
# the with block as an implicit task, otherwise
# timeouts and cancellations won't work with await fn()!
pool.spawn(child, 5) # This will complete
pool.spawn(child, 10) # This will not
print("[main] Children spawned, awaiting completion")
async with giambio.with_timeout(10) as pool:
pool.spawn(child, 7) # This will complete
await child(20) # TODO: Broken
except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())
giambio.run(main, debugger=Debugger())