Various improvements to async pools

This commit is contained in:
nocturn9x 2020-11-16 08:07:19 +01:00
parent cc9eccf027
commit 981a598ae7
11 changed files with 685 additions and 559 deletions

View File

@ -16,10 +16,13 @@ limitations under the License.
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (1, 0, 0)
from ._run import run, clock, wrap_socket, create_pool
from .exceptions import GiambioError, AlreadyJoinedError, CancelledError
from ._traps import sleep
from ._layers import Event
from .traps import sleep, current_task
from .objects import Event
from .run import run, clock, wrap_socket, create_pool, get_event_loop, new_event_loop
__all__ = [
@ -30,5 +33,8 @@ __all__ = [

View File

@ -1,335 +0,0 @@
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
# Import libraries and internal resources
import types
from collections import defaultdict
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket
from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy, GiambioError
from timeit import default_timer
from time import sleep as wait
from .socket import AsyncSocket, WantWrite, WantRead
from ._layers import Task, TimeQueue
from socket import SOL_SOCKET, SO_ERROR
from ._traps import want_read, want_write
import traceback, sys
class AsyncScheduler:
An asynchronous scheduler toy implementation. Tries to mimic the threaded
model in its simplicity, without using actual threads, but rather alternating
across coroutines execution to let more than one thing at a time to proceed
with its calculations. An attempt to fix the threaded model underlying pitfalls
and weaknesses has been made, without making the API unnecessarily complicated.
A few examples are tasks cancellation and exception propagation.
Can perform (unreliably) socket I/O asynchronously.
def __init__(self):
"""Object constructor"""
self.tasks = [] # Tasks that are ready to run
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.current_task = None # This will always point to the currently running coroutine (Task object)
self.joined = (
) # Maps child tasks that need to be joined their respective parent task
self.clock = (
default_timer # Monotonic clock to keep track of elapsed time reliably
self.some_cancel = False
self.paused = TimeQueue(self.clock) # Tasks that are asleep = set() # All Event objects
self.event_waiting = defaultdict(list) # Coroutines waiting on event objects
self.sequence = 0
def _run(self):
Starts the loop and 'listens' for events until there are either ready or asleep tasks,
then exit. This behavior kinda reflects a kernel, as coroutines can request
the loop's functionality only trough some fixed entry points, which in turn yield and
give execution control to the loop itself.
while True:
if not self.selector.get_map() and not any(
[self.paused, self.tasks, self.event_waiting]
): # If there is nothing to do, just exit
elif not self.tasks:
if self.paused:
# If there are no actively running tasks, we try to schedule the asleep ones
if self.selector.get_map():
self._check_io() # The next step is checking for I/O
if self.event_waiting:
# Try to awake event-waiting tasks
while self.tasks: # While there are tasks to run
self.current_task = self.tasks.pop(0)
if self.some_cancel:
# Sets the currently running task
method, *args = # Run a single step with the calculation
self.current_task.status = "run"
getattr(self, f"_{method}")(*args)
# Sneaky method call, thanks to David Beazley for this ;)
except CancelledError:
self.current_task.cancelled = True
except StopIteration as e: # Coroutine ends
self.current_task.result = e.args[0] if e.args else None
self.current_task.finished = True
except RuntimeError:
except BaseException as error: # Coroutine raised
self.current_task.exc = error
def _check_cancel(self):
Checks for task cancellation
if self.current_task.status == "cancel": # Deferred cancellation
self.current_task.cancelled = True
def _check_events(self):
Checks for ready or expired events and triggers them
for event, tasks in self.event_waiting.copy().items():
if event._set:
event.event_caught = True
self.tasks.extend(tasks + [event.notifier])
def _check_sleeping(self):
Checks and reschedules sleeping tasks
wait(max(0.0, self.paused[0][0] - self.clock()))
# Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock():
# Reschedules tasks when their deadline has elapsed
if not self.paused:
def _check_io(self):
Checks and schedules task to perform I/O
timeout = 0.0 if self.tasks else None
# If there are no tasks ready wait indefinitely
io_ready =
# Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.tasks.append( # Resource ready? Schedule its task
def start(self, func: types.FunctionType, *args):
Starts the event loop from a sync context
entry = Task(func(*args))
self._join(entry) # TODO -> Inspect this line, does it actually do anything useful?
return entry
def _reschedule_parent(self):
Reschedules the parent task of the
currently running task, if any
parent = self.joined.pop(self.current_task, None)
if parent:
return parent
# TODO: More generic I/O rather than just sockets
def _want_read(self, sock: socket.socket):
Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing
self.current_task.status = "I/O"
if self.current_task._last_io:
if self.current_task._last_io == ("READ", sock):
return # Socket is already scheduled!
self.current_task._last_io = "READ", sock
self.selector.register(sock, EVENT_READ, self.current_task)
except KeyError: # The socket is already registered doing something else
raise ResourceBusy("The given resource is busy!") from None
def _want_write(self, sock: socket.socket):
Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing
self.current_task.status = "I/O"
if self.current_task._last_io:
if self.current_task._last_io == ("WRITE", sock):
return # Socket is already scheduled!
self.selector.unregister(sock) # modify() causes issues
self.current_task._last_io = "WRITE", sock
self.selector.register(sock, EVENT_WRITE, self.current_task)
except KeyError:
raise ResourceBusy("The given resource is busy!") from None
def _join(self, child: types.coroutine):
Handler for the 'join' event, does some magic to tell the scheduler
to wait until the passed coroutine ends. The result of this call equals whatever the
coroutine returns or, if an exception gets raised, the exception will get propagated inside the
parent task
if child.cancelled or child.exc: # Task was cancelled or has errored
elif child.finished: # Task finished running
self.tasks.append(self.current_task) # Task has already finished
if child not in self.joined:
self.joined[child] = self.current_task
raise AlreadyJoinedError(
"Joining the same task multiple times is not allowed!"
def _sleep(self, seconds: int or float):
Puts the caller to sleep for a given amount of seconds
if seconds:
self.current_task.status = "sleep"
self.paused.put(self.current_task, seconds)
def _event_set(self, event):
Sets an event
event.notifier = self.current_task
event._set = True
def _event_wait(self, event):
Waits for an event
if event in
event.waiting -= 1
if event.waiting <= 0:
def _cancel(self, task):
Handler for the 'cancel' event, throws CancelledError inside a coroutine
in order to stop it from executing. The loop continues to execute as tasks
are independent
if not self.some_cancel:
self.some_cancel = True
task.status = "cancel" # Cancellation is deferred
def wrap_socket(self, sock):
Wraps a standard socket into an AsyncSocket object
return AsyncSocket(sock, self)
async def _read_sock(self, sock: socket.socket, buffer: int):
Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
from the socket
await want_read(sock)
return sock.recv(buffer)
except WantRead:
await want_write(sock)
return sock.recv(buffer)
async def _accept_sock(self, sock: socket.socket):
Accepts a socket connection asynchronously, waiting until the resource is available and returning the
result of the accept() call
await want_read(sock)
return sock.accept()
async def _sock_sendall(self, sock: socket.socket, data: bytes):
Sends all the passed data, as bytes, trough the socket asynchronously
while data:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:]
async def _close_sock(self, sock: socket.socket):
Closes the socket asynchronously
await want_write(sock)
return sock.close()
async def _connect_sock(self, sock: socket.socket, addr: tuple):
Connects a socket asynchronously
try: # "Borrowed" from curio
return sock.connect(addr)
except WantWrite:
await want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0:
raise OSError(err, f"Connect call failed: {addr}")

View File

@ -1,118 +0,0 @@
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
# Helper methods to interact with the event loop
# These coroutines are the one and only way to interact
# with the event loop from the user's perspective, and
# the entire library is based on these traps
import types
import socket
def sleep(seconds: int):
"""Pause the execution of a coroutine for the passed amount of seconds,
without blocking the entire event loop, which keeps watching for other events
This function is also useful as a sort of checkpoint, because it returns the execution
control to the scheduler, which can then switch to another task. If a coroutine does not have
enough calls to async methods (or 'checkpoints'), e.g one that needs the 'await' keyword before it, this might
affect performance as it would prevent the scheduler from switching tasks properly. If you feel
like this happens in your code, try adding a call to giambio.sleep(0); this will act as a checkpoint without
actually pausing the execution of your coroutine
:param seconds: The amount of seconds to sleep for
:type seconds: int
assert seconds >= 0, "The time delay can't be negative"
yield "sleep", seconds
def join(task):
"""'Tells' the scheduler that the desired task MUST be awaited for completion
:param task: The task to join
:type task: class: Task
res = yield "join", task
return task.result
def cancel(task):
'Tells' the scheduler that the passed task must be cancelled
The concept of cancellation here is tricky, because there is no real way to 'stop' a
running task if not by raising an exception inside it and just ignore whatever the task
returns (and also hoping that the task won't cause collateral damage when exiting abruptly).
It is highly recommended that when you write a coroutine you take into account that it might
be cancelled at any time. Please note, though, that ignoring a giambio.exceptions.CancelledError
exception *will* break your code, so if you really wanna do that be sure to re-raise
it when done!
yield "cancel", task
assert task.cancelled, f"Coroutine ignored CancelledError"
def want_read(sock: socket.socket):
'Tells' the event loop that there is some coroutine that wants to read from the given socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
yield "want_read", sock
def want_write(sock: socket.socket):
'Tells' the event loop that there is some coroutine that wants to write on the given socket
:param sock: The socket to perform the operation on
:type sock: class: socket.socket
yield "want_write", sock
def event_set(event):
"""Communicates to the loop that the given event object
must be set. This is important as the loop constantly
checks for active events to deliver them
yield "event_set", event
def event_wait(event):
Notifies the event loop that the current task has to wait
for the event to trigger
msg = yield "event_wait", event
return msg

View File

@ -14,9 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
from ._core import AsyncScheduler
from ._layers import Task
import types
from .core import AsyncScheduler
from .objects import Task
class TaskManager:
@ -30,15 +31,17 @@ class TaskManager:
self.loop = loop
self.tasks = []
def spawn(self, func: types.FunctionType, *args):
Spawns a child task
task = Task(func(*args))
task = Task(func(*args), func.__name__ or str(func))
task.parent = self.loop.current_task
return task
def spawn_after(self, func: types.FunctionType, n: int, *args):
@ -46,23 +49,19 @@ class TaskManager:
assert n >= 0, "The time delay can't be negative"
task = Task(func(*args))
task = Task(func(*args), func.__name__ or str(func))
task.parent = self.loop.current_task
self.loop.paused.put(task, n)
return task
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
for task in self.loop.tasks:
for task in self.tasks:
await task.join()
except BaseException as e:
for running_task in self.loop.tasks:
await running_task.cancel()
for _, __, asleep_task in self.loop.paused:
await asleep_task.cancel()
for waiting_tasks in self.loop.event_waiting.values():
for waiting_task in waiting_tasks:
await waiting_task.cancel()
raise e
except BaseException as task_error:
for dead in self.tasks:
await dead.cancel()
raise task.exc

giambio/ Normal file
View File

@ -0,0 +1,401 @@
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
# Import libraries and internal resources
import types
import socket
from time import sleep as wait
from timeit import default_timer
from .objects import Task, TimeQueue
from socket import SOL_SOCKET, SO_ERROR
from .traps import want_read, want_write
from collections import defaultdict, deque
from .socket import AsyncSocket, WantWrite, WantRead
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from .exceptions import (
# The main runtime environment for giambio
class AsyncScheduler:
An asynchronous scheduler implementation. Tries to mimic the threaded
model in its simplicity, without using actual threads, but rather alternating
across coroutines execution to let more than one thing at a time to proceed
with its calculations. An attempt to fix the threaded model has been made
without making the API unnecessarily complicated.
A few examples are tasks cancellation and exception propagation.
def __init__(self):
Object constructor
# Tasks that are ready to run
self.tasks = deque()
# Selector object to perform I/O multiplexing
self.selector = DefaultSelector()
# This will always point to the currently running coroutine (Task object)
self.current_task = None
# Monotonic clock to keep track of elapsed time reliably
self.clock = default_timer
# Tasks that are asleep
self.paused = TimeQueue(self.clock)
# All active Event objects = set()
# Coroutines waiting on event objects
self.event_waiting = defaultdict(list)
# Data to send back to a trap
self.to_send = None
# Have we ever ran?
self.has_ran = False
def done(self):
Returns True if there is work to do
if self.selector.get_map() or any([self.paused, self.tasks, self.event_waiting]):
return False
return True
def shutdown(self):
Shuts down the event loop
def run(self):
Starts the loop and 'listens' for events until there is work to do,
then exits. This behavior kinda reflects a kernel, as coroutines can
request the loop's functionality only trough some fixed entry points,
which in turn yield and give execution control to the loop itself.
while True:
if self.done():
elif not self.tasks:
if self.paused:
# If there are no actively running tasks
# we try to schedule the asleep ones
if self.selector.get_map():
# The next step is checking for I/O
if self.event_waiting:
# Try to awake event-waiting tasks
# While there are tasks to run
while self.tasks:
# Sets the currently running task
self.current_task = self.tasks.popleft()
if self.current_task.cancel_pending:
if self.to_send and self.current_task.status != "init":
data = self.to_send
data = None
# Run a single step with the calculation
method, *args =
self.current_task.status = "run"
self.current_task.steps += 1
# Data has been sent, reset it to None
if self.to_send and self.current_task != "init":
self.to_send = None
# Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args)
except CancelledError:
self.current_task.status = "end"
self.current_task.cancelled = True
self.current_task.cancel_pending = False
self.join(self.current_task, self.current_task.parent)
except StopIteration as ret:
# Coroutine ends
self.current_task.status = "end"
self.current_task.result = ret.value
self.current_task.finished = True
except BaseException as err:
self.current_task.exc = err
self.current_task.status = "crashed"
self.join(self.current_task, self.current_task.parent)
def do_cancel(self):
Performs task cancellation by throwing CancelledError inside the current
task in order to stop it from executing. The loop continues to execute
as tasks are independent
def get_running(self):
Returns the current task
self.to_send = self.current_task
def trigger_events(self):
Checks for ready or expired events and triggers them
for event, tasks in self.event_waiting.copy().items():
if event.set:
event.event_caught = True
self.tasks.extend(tasks + [event.notifier])
def awake_sleeping(self):
Checks for and reschedules sleeping tasks
wait(max(0.0, self.paused[0][0] - self.clock()))
# Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock():
# Reschedules tasks when their deadline has elapsed
if not self.paused:
def check_io(self):
Checks and schedules task to perform I/O
# If there are no tasks ready wait indefinitely
timeout = 0.0 if self.tasks else None
for key in dict(self.selector.get_map()).values():
if self.selector.get_map():
io_ready =
# Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.tasks.append( # Resource ready? Schedule its task
def start(self, func: types.FunctionType, *args):
Starts the event loop from a sync context
entry = Task(func(*args), func.__name__ or str(func))
self.has_ran = True
if entry.exc:
raise entry.exc from None
def reschedule_parent(self):
Reschedules the parent task of the
currently running task, if any
parent = self.current_task.parent
if parent:
return parent
def reschedule_joinee(self):
Reschedules the joinee task of the
currently running task, if any
def join(self, child: types.coroutine, parent):
Handler for the 'join' event, does some magic to tell the scheduler
to wait in the given parent until the current coroutine ends
child.joined = True
if parent:
if child.cancelled or child.exc:
# Task was cancelled or has errored
if child.parent:
elif child.finished:
# if parent:
# self.tasks.append(parent)
def sleep(self, seconds: int or float):
Puts the caller to sleep for a given amount of seconds
if seconds:
self.current_task.status = "sleep"
self.paused.put(self.current_task, seconds)
# TODO: More generic I/O rather than just sockets
def want_read(self, sock: socket.socket):
Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing
self.current_task.status = "I/O"
if self.current_task.last_io:
if self.current_task.last_io == ("READ", sock):
# Socket is already scheduled!
self.current_task.last_io = "READ", sock
self.selector.register(sock, EVENT_READ, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy("The given resource is busy!") from None
def want_write(self, sock: socket.socket):
Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing
self.current_task.status = "I/O"
if self.current_task.last_io:
if self.current_task.last_io == ("WRITE", sock):
# Socket is already scheduled!
# modify() causes issues
self.current_task.last_io = "WRITE", sock
self.selector.register(sock, EVENT_WRITE, self.current_task)
except KeyError:
raise ResourceBusy("The given resource is busy!") from None
def event_set(self, event):
Sets an event
event.notifier = self.current_task
event.set = True
def event_wait(self, event):
Waits for an event
if event in
event.waiting -= 1
if event.waiting <= 0:
def cancel(self, task):
Handler for the 'cancel' event, sets the task to be cancelled later
task.cancel_pending = True # Cancellation is deferred
def wrap_socket(self, sock):
Wraps a standard socket into an AsyncSocket object
return AsyncSocket(sock, self)
async def read_sock(self, sock: socket.socket, buffer: int):
Reads from a socket asynchronously, waiting until the resource is available and returning up to buffer bytes
from the socket
return sock.recv(buffer)
except WantRead:
await want_read(sock)
return sock.recv(buffer)
async def accept_sock(self, sock: socket.socket):
Accepts a socket connection asynchronously, waiting until the resource is available and returning the
result of the accept() call
return sock.accept()
except WantRead:
await want_read(sock)
return sock.accept()
async def sock_sendall(self, sock: socket.socket, data: bytes):
Sends all the passed data, as bytes, trough the socket asynchronously
while data:
sent_no = sock.send(data)
except WantWrite:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:]
async def close_sock(self, sock: socket.socket):
Closes the socket asynchronously
await want_write(sock)
return sock.close()
async def connect_sock(self, sock: socket.socket, addr: tuple):
Connects a socket asynchronously
try: # "Borrowed" from curio
return sock.connect(addr)
except WantWrite:
await want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0:
raise OSError(err, f"Connect call failed: {addr}")

View File

@ -15,65 +15,79 @@ limitations under the License.
import types
from ._traps import join, cancel, event_set, event_wait
from .traps import join, cancel, event_set, event_wait
from heapq import heappop, heappush
from .exceptions import GiambioError
from dataclasses import dataclass, field
class Task:
"""A simple wrapper around a coroutine object"""
A simple wrapper around a coroutine object
def __init__(self, coroutine: types.coroutine):
self.coroutine = coroutine
self.cancelled = False # True if the task gets cancelled
self.exc = None
self.result = None
self.finished = False
self.status = "init" # This is useful for cancellation
self._last_io = None
coroutine: types.CoroutineType
name: str
cancelled: bool = False # True if the task gets cancelled
exc: BaseException = None
result: object = None
finished: bool = False
status: str = "init"
steps: int = 0
last_io: tuple = ()
parent: object = None
joined: bool= False
cancel_pending: bool = False
waiters: list = field(default_factory=list)
def run(self, what=None):
"""Simple abstraction layer over the coroutines ``send`` method"""
Simple abstraction layer over coroutines' ``send`` method
return self.coroutine.send(what)
def throw(self, err: Exception):
"""Simple abstraction layer over the coroutines ``throw`` method"""
Simple abstraction layer over coroutines ``throw`` method
return self.coroutine.throw(err)
async def join(self):
"""Joins the task"""
Joins the task
if self.cancelled and not self.exc:
return None
if self.exc:
raise self.exc
res = await join(self)
if self.exc:
raise self.exc
return res
async def cancel(self):
"""Cancels the task"""
Cancels the task
await cancel(self)
# await join(self) # TODO -> Join ourselves after cancellation?
def __repr__(self):
"""Implements repr(self)"""
return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished}, status={self.status})"
def __del__(self):
class Event:
"""A class designed similarly to threading.Event"""
A class designed similarly to threading.Event
def __init__(self):
"""Object constructor"""
Object constructor
self._set = False
self.set = False
self.event_caught = False
self.timeout = None
self.waiting = 0
@ -81,18 +95,20 @@ class Event:
async def set(self):
Sets the event, waking up all tasks that called
pause() on this event
pause() on us
if self._set:
if self.set:
raise GiambioError("The event has already been set")
await event_set(self)
async def pause(self):
"""Waits until the event is set and returns a value"""
Waits until the event is set
self.waiting += 1
return await event_wait(self)
await event_wait(self)
class TimeQueue:
@ -102,6 +118,10 @@ class TimeQueue:
def __init__(self, clock):
Object constructor
self.clock = clock
self.sequence = 0
self.container = []

View File

@ -14,19 +14,49 @@ See the License for the specific language governing permissions and
limitations under the License.
import socket
import threading
from ._core import AsyncScheduler
from ._layers import Task
from ._managers import TaskManager
from .core import AsyncScheduler
from .exceptions import GiambioError
from .context import TaskManager
from .socket import AsyncSocket
from types import FunctionType, CoroutineType, GeneratorType
import socket
thread_local = threading.local()
def run(func: FunctionType, *args) -> Task:
def get_event_loop():
Returns the event loop associated to the current
return thread_local.loop
except AttributeError:
raise GiambioError("no event loop set") from None
def new_event_loop():
Associates a new event loop to the current thread
and deactivates the old one. This should not be
called explicitly unless you know what you're doing
loop = thread_local.loop
except AttributeError:
thread_local.loop = AsyncScheduler()
if not loop.done():
raise GiambioError("cannot set event loop while running")
thread_local.loop = AsyncScheduler()
def run(func: FunctionType, *args):
Starts the event loop from a synchronous entry point
@ -34,11 +64,8 @@ def run(func: FunctionType, *args) -> Task:
if isinstance(func, (CoroutineType, GeneratorType)):
raise RuntimeError("Looks like you tried to call, arg2, ...)), that is wrong!"
"\nWhat you wanna do, instead, is this:, arg1, arg2, ...)")
return thread_local.loop.start(func, *args)
except AttributeError:
thread_local.loop = AsyncScheduler()
return thread_local.loop.start(func, *args)
thread_local.loop.start(func, *args)
def clock():

View File

@ -20,8 +20,9 @@ limitations under the License.
import socket
from .exceptions import ResourceClosed
from ._traps import sleep
from .traps import sleep
# Stolen from curio
from ssl import SSLWantReadError, SSLWantWriteError
WantRead = (BlockingIOError, InterruptedError, SSLWantReadError)
@ -32,7 +33,9 @@ except ImportError:
class AsyncSocket(object):
"""Abstraction layer for asynchronous sockets"""
Abstraction layer for asynchronous sockets
def __init__(self, sock: socket.socket, loop):
self.sock = sock
@ -41,46 +44,54 @@ class AsyncSocket(object):
self._closed = False
async def receive(self, max_size: int):
"""Receives up to max_size from a socket asynchronously"""
Receives up to max_size from a socket asynchronously
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
self.loop.current_task.status = "I/O"
return await self.loop._read_sock(self.sock, max_size)
return await self.loop.read_sock(self.sock, max_size)
async def accept(self):
"""Accepts the socket, completing the 3-step TCP handshake asynchronously"""
Accepts the socket, completing the 3-step TCP handshake asynchronously
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
to_wrap = await self.loop._accept_sock(self.sock)
to_wrap = await self.loop.accept_sock(self.sock)
return self.loop.wrap_socket(to_wrap[0]), to_wrap[1]
async def send_all(self, data: bytes):
"""Sends all data inside the buffer asynchronously until it is empty"""
Sends all data inside the buffer asynchronously until it is empty
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
return await self.loop._sock_sendall(self.sock, data)
return await self.loop.sock_sendall(self.sock, data)
async def close(self):
"""Closes the socket asynchronously"""
Closes the socket asynchronously
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
await sleep(0) # Give the scheduler the time to unregister the socket first
await self.loop._close_sock(self.sock)
await self.loop.close_sock(self.sock)
self._closed = True
async def connect(self, addr: tuple):
"""Connects the socket to an endpoint"""
Connects the socket to an endpoint
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
await self.loop._connect_sock(self.sock, addr)
await self.loop.connect_sock(self.sock, addr)
async def __aenter__(self):
return self.sock.__enter__()
return self
async def __aexit__(self, *_):
await self.close()

giambio/ Normal file
View File

@ -0,0 +1,134 @@
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
# Implementation for all giambio traps, which are hooks
# into the event loop and allow it to switch tasks
# These coroutines are the one and only way to interact
# with the event loop from the user's perspective, and
# the entire library is based on these traps
import types
def create_trap(method, *args):
Creates and yields a trap. This
is the lowest-level method to
interact with the event loop
data = yield method, *args
return data
async def sleep(seconds: int):
Pause the execution of an async function for a given amount of seconds,
without blocking the entire event loop, which keeps watching for other events
This function is also useful as a sort of checkpoint, because it returns
control to the scheduler, which can then switch to another task. If your code
doesn't have enough calls to async functions (or 'checkpoints') this might
prevent the scheduler from switching tasks properly. If you feel like this
happens in your code, try adding a call to giambio.sleep(0) somewhere.
This will act as a checkpoint without actually pausing the execution
of your function, but it will allow the scheduler to switch tasks
:param seconds: The amount of seconds to sleep for
:type seconds: int
assert seconds >= 0, "The time delay can't be negative"
await create_trap("sleep", seconds)
async def current_task():
Gets the currently running task
return await create_trap("get_running")
async def join(task):
Awaits a given task for completion
:param task: The task to join
:type task: class: Task
return await create_trap("join", task, await current_task())
async def cancel(task):
Cancels the given task
The concept of cancellation is tricky, because there is no real way to 'stop'
a task if not by raising an exception inside it and ignoring whatever it
returns (and also hoping that the task won't cause collateral damage). It
is highly recommended that when you write async code you take into account
that it might be cancelled at any time. You might think to just ignore the
cancellation exception and be done with it, but doing so *will* break your
code, so if you really wanna do that be sure to re-raise it when done!
await create_trap("cancel", task)
assert task.cancelled, f"Coroutine ignored CancelledError"
async def want_read(stream):
Notifies the event loop that a task that wants to read from the given
:param stream: The resource that needs to be read
await create_trap("want_read", stream)
async def want_write(stream):
Notifies the event loop that a task that wants to read from the given
:param stream: The resource that needs to be written
await create_trap("want_write", stream)
async def event_set(event):
Communicates to the loop that the given event object
must be set. This is important as the loop constantly
checks for active events to deliver them
await create_trap("event_set", event)
async def event_wait(event):
Notifies the event loop that the current task has to wait
for given event to trigger
await create_trap("event_wait", event)

View File

@ -9,7 +9,7 @@ async def countdown(n: int):
print(f"Down {n}")
n -= 1
await giambio.sleep(1)
# raise Exception("oh no man") # Uncomment to test propagation
# raise Exception("oh no man") # Uncomment to test propagation
print("Countdown over")
return 0
@ -18,7 +18,7 @@ async def countup(stop: int, step: int = 1):
x = 0
while x < stop:
print(f"Up {x}")
x += 1
x += step
await giambio.sleep(step)
print("Countup over")
return 1
@ -26,30 +26,13 @@ async def countup(stop: int, step: int = 1):
async def main():
print("Creating an async pool")
async with giambio.create_pool() as pool:
print("Starting counters")
pool.spawn(countdown, 10)
count_up = pool.spawn(countup, 5, 2)
# raise Exception
# Raising an exception here has a weird
# Behavior: The exception is propagated
# *after* all the child tasks complete,
# which is not what we want
# print("Sleeping for 2 seconds before cancelling")
# await giambio.sleep(2)
# await count_up.cancel() # TODO: Cancel _is_ broken, this does not re-schedule the parent!
# print("Cancelled countup")
print("Task execution complete")
pool.spawn(countup, 5, 2)
except Exception as e:
print(f"Caught this bad boy in here, propagating it -> {type(e).__name__}: {e}")
print(f"Got -> {type(e).__name__}: {e}")
print("Task execution complete")
if __name__ == "__main__":
print("Starting event loop")
except BaseException as error:
print(f"Exception caught from main event loop! -> {type(error).__name__}: {error}")
print("Event loop done")

View File

@ -3,6 +3,7 @@ from giambio.socket import AsyncSocket
import socket
import logging
import sys
import traceback
# A test to check for asynchronous I/O
@ -15,14 +16,10 @@ async def serve(address: tuple):
asock = giambio.wrap_socket(sock) # We make the socket an async socket"Serving asynchronously at {address[0]}:{address[1]}")
while True:
async with giambio.create_pool() as pool:
conn, addr = await asock.accept()"{addr[0]}:{addr[1]} connected")
pool.spawn(handler, conn, addr)
except TypeError:
print("Looks like we have a naughty boy here!")
async with giambio.create_pool() as pool:
conn, addr = await asock.accept()"{addr[0]}:{addr[1]} connected")
pool.spawn(handler, conn, addr)
async def handler(sock: AsyncSocket, addr: tuple):
@ -51,6 +48,7 @@ if __name__ == "__main__":
try:, ("localhost", port))
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):"Ctrl+C detected, exiting")