Added chatroom example and initial work on (semi-broken) async Queue implementation

This commit is contained in:
Nocturn9x 2022-02-04 20:19:48 +01:00
parent cd2a436d3d
commit a0d376bb35
10 changed files with 239 additions and 51 deletions

View File

@ -20,9 +20,9 @@ __author__ = "Nocturn9x"
__version__ = (0, 0, 1)
from . import exceptions, socket, context, core, task, io
from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task
from giambio.sync import Event
from giambio.sync import Event, Queue
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug
@ -33,6 +33,7 @@ __all__ = [
"context",
"sleep",
"Event",
"Queue",
"run",
"clock",
"create_pool",
@ -42,4 +43,7 @@ __all__ = [
"new_event_loop",
"debug",
"skip_after",
"task",
"io",
"socket"
]

View File

@ -235,7 +235,6 @@ class AsyncScheduler:
self.current_task.result = ret.value
self.current_task.finished = True
self.join(self.current_task)
self.tasks.remove(self.current_task)
except BaseException as err:
# Our handy join mechanism will handle all the hassle of
# rescheduling joiners and propagating errors, so we
@ -243,8 +242,7 @@ class AsyncScheduler:
# self.join() work its magic
self.current_task.exc = err
self.join(self.current_task)
if self.current_task in self.tasks:
self.tasks.remove(self.current_task)
def create_task(self, corofunc: types.FunctionType, pool, *args, **kwargs) -> Task:
"""
@ -344,19 +342,19 @@ class AsyncScheduler:
self.io_release(k.fileobj)
task.last_io = ()
def suspend(self, task: Task):
def suspend(self):
"""
Suspends execution of the given task. This is basically
Suspends execution of the current task. This is basically
a do-nothing method, since it will not reschedule the task
before returning. The task will stay suspended as long as
something else outside the loop calls a trap to reschedule it.
This method will unregister any I/O as well to ensure the task
isn't rescheduled in further calls to select()
Any pending I/O for the task is temporarily unscheduled to
avoid some previous network operation to reschedule the task
before it's due
"""
if task.last_io:
self.io_release_task(task)
if self.current_task.last_io:
self.io_release_task(self.current_task)
def reschedule_running(self):
"""
@ -450,6 +448,8 @@ class AsyncScheduler:
:param tasks: The list of task objects to schedule
"""
for task in tasks:
self.paused.discard(task)
self.run_ready.extend(tasks)
self.reschedule_running()
@ -639,11 +639,11 @@ class AsyncScheduler:
self.debugger.on_task_exit(task)
if task.last_io:
self.io_release_task(task)
# If the pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s)
if task.pool is None:
return
if task.pool.done():
# If the pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s)
self.reschedule_joiners(task)
elif task.exc:
task.status = "crashed"
@ -651,13 +651,13 @@ class AsyncScheduler:
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
# frames from the exception call stack, but for now removing at least the first one
# seems a sensible approach (it's us catching it so we don't care about that)
task.exc.__traceback__ = task.exc.__traceback__.tb_next
if task.last_io:
self.io_release_task(task)
for _ in range(5):
if task.exc.__traceback__.tb_next:
task.exc.__traceback__ = task.exc.__traceback__.tb_next
self.debugger.on_exception_raised(task, task.exc)
if task.pool is None:
if task.pool is None or task is self.entry_point:
# Parent task has no pool, so we propagate
raise
raise task.exc
if self.cancel_pool(task.pool):
# This will reschedule the parent(s)
# only if all the tasks inside the task's
@ -671,6 +671,9 @@ class AsyncScheduler:
except (StopIteration, CancelledError, RuntimeError):
# TODO: Need anything else?
task.joiners.remove(t)
finally:
if t in self.tasks:
self.tasks.remove(t)
self.reschedule_joiners(task)
def sleep(self, seconds: int or float):
@ -678,8 +681,8 @@ class AsyncScheduler:
Puts the current task to sleep for a given amount of seconds
"""
self.debugger.before_sleep(self.current_task, seconds)
if seconds:
self.debugger.before_sleep(self.current_task, seconds)
self.current_task.status = "sleep"
self.current_task.sleep_start = self.clock()
self.paused.put(self.current_task, seconds)
@ -703,7 +706,7 @@ class AsyncScheduler:
# The task isn't running already!
task.cancel_pending = False
return
elif task.status in ("io", "sleep"):
elif task.status in ("io", "sleep", "run"):
# We cancel immediately only in a context where it's safer to do
# so. The concept of "safer" is quite tricky, because even though the
# task is technically not running, it might leave some unfinished state
@ -736,10 +739,6 @@ class AsyncScheduler:
# If the task ignores our exception, we'll
# raise it later again
task.cancel_pending = True
else:
# If we can't cancel in a somewhat "graceful" way, we just
# defer this operation for later (check run() for more info)
task.cancel_pending = True # Cancellation is deferred
def register_sock(self, sock, evt_type: str):
"""

View File

@ -242,6 +242,32 @@ class AsyncSocket:
await want_write(self.sock)
except WantRead:
await want_read(self.sock)
async def getpeername(self):
"""
Wrapper socket method
"""
while True:
try:
return self.sock.getpeername()
except WantWrite:
await want_write(self.sock)
except WantRead:
await want_read(self.sock)
async def getsockname(self):
"""
Wrapper socket method
"""
while True:
try:
return self.sock.getpeername()
except WantWrite:
await want_write(self.sock)
except WantRead:
await want_read(self.sock)
async def recvmsg(self, bufsize, ancbufsize=0, flags=0):
"""

View File

@ -15,14 +15,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import Any
from giambio.traps import event_wait, event_set
from collections import deque
from typing import Any, Optional
from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop
from giambio.exceptions import GiambioError
class Event:
"""
A class designed similarly to threading.Event
A class designed similarly to threading.Event (not
thread-safe though)
"""
def __init__(self):
@ -53,20 +55,55 @@ class Event:
class Queue:
"""
An asynchronous queue similar to asyncio.Queue.
NOT thread safe!
An asynchronous FIFO queue similar to asyncio.Queue
that uses a collections.deque object for the underlying
data representation. This queue is *NOT* thread-safe
"""
def __init__(self, maxsize: int):
def __init__(self, maxsize: Optional[int] = None):
"""
Object constructor
"""
self.events = {}
self.container = []
self.maxsize = maxsize
self.getters = deque()
self.putters = deque()
self.container = deque(maxlen=maxsize)
async def put(self, item: Any):
"""
Pushes an element onto the queue. If the
queue is full, waits until there's
enough space for the queue
"""
if not self.maxsize or len(self.container) < self.maxsize:
if self.getters:
task = self.getters.popleft()
loop = await current_loop()
loop._data[task] = item
await schedule_tasks([task])
else:
self.container.append(item)
else:
self.putters.append(await current_task())
print(self.putters)
await suspend()
async def get(self) -> Any:
"""
Pops an element off the queue. Blocks until
an element is put onto it again if the queue
is empty
"""
if self.container:
if self.putters:
await schedule_tasks([self.putters.popleft()])
return self.container.popleft()
else:
self.getters.append(await current_task())
return await suspend()

View File

@ -30,16 +30,18 @@ class Task:
"""
# The name of the task. Usually this equals self.coroutine.__name__,
# but in some cases it falls back to repr(self.coroutine)
# but it may fall back to repr(self.coroutine)
name: str
# The underlying coroutine object to wrap around a giambio task
coroutine: Coroutine
# The async pool that spawned this task. The one and only task that hasn't
# an associated pool is the main entry point which is not available externally
# The async pool that spawned this task. The one and only task which may have
# no associated pool is the main entry point which is not available externally
# (but if a pool is started in the main task, it somewhat becomes part of that
# pool as its parent)
pool: Union["giambio.context.TaskManager", None] = None
# Whether the task has been cancelled or not. This is True both when the task is
# explicitly cancelled via its cancel() method or when it is cancelled as a result
# of an exception in another task in the same pool
# of an exception in another task
cancelled: bool = False
# This attribute will be None unless the task raised an error
exc: BaseException = None
@ -51,10 +53,10 @@ class Task:
# time by the event loop, internally. Possible values for this are "init"--
# when the task has been created but not started running yet--, "run"-- when
# the task is running synchronous code--, "io"-- when the task is waiting on
# an I/O resource--, "sleep"-- when the task is either asleep or waiting on
# an event, "crashed"-- when the task has exited because of an exception
# and "cancelled" when-- when the task has been explicitly cancelled with
# its cancel() method or as a result of an exception
# an I/O resource--, "sleep"-- when the task is either asleep, waiting on
# an event or otherwise suspended, "crashed"-- when the task has exited because
# of an exception and "cancelled" when-- when the task has been explicitly cancelled
# with its cancel() method or as a result of an exception
status: str = "init"
# This attribute counts how many times the task's run() method has been called
steps: int = 0
@ -106,13 +108,10 @@ class Task:
are propagated as well
"""
task = await giambio.traps.current_task()
if task:
if task := await giambio.traps.current_task():
self.joiners.add(task)
res = await giambio.traps.join(self)
if self.exc:
raise self.exc
return res
return await giambio.traps.join(self)
async def cancel(self):
"""

View File

@ -25,7 +25,7 @@ import types
import inspect
from giambio.task import Task
from types import FunctionType
from typing import Union, Iterable
from typing import Any, Union, Iterable
from giambio.exceptions import GiambioError
@ -41,6 +41,14 @@ def create_trap(method, *args):
return data
async def suspend() -> Any:
"""
Suspends the current task
"""
return await create_trap("suspend")
async def create_task(coro: FunctionType, pool, *args):
"""
Spawns a new task in the current event loop from a bare coroutine

80
tests/chatroom.py Normal file
View File

@ -0,0 +1,80 @@
from typing import List
import giambio
from giambio.socket import AsyncSocket
import logging
import sys
# An asynchronous chatroom
clients: List[giambio.socket.AsyncSocket] = []
async def serve(bind_address: tuple):
"""
Serves asynchronously forever
:param bind_address: The address to bind the server to represented as a tuple
(address, port) where address is a string and port is an integer
"""
sock = giambio.socket.socket()
await sock.bind(bind_address)
await sock.listen(5)
logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}")
async with giambio.create_pool() as pool:
async with sock:
while True:
try:
conn, address_tuple = await sock.accept()
clients.append(conn)
logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected")
await pool.spawn(handler, conn, address_tuple)
except Exception as err:
# Because exceptions just *work*
logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}")
async def handler(sock: AsyncSocket, client_address: tuple):
"""
Handles a single client connection
:param sock: The AsyncSocket object connected to the client
:param client_address: The client's address represented as a tuple
(address, port) where address is a string and port is an integer
:type client_address: tuple
"""
address = f"{client_address[0]}:{client_address[1]}"
async with sock: # Closes the socket automatically
await sock.send_all(b"Welcome to the chartoom pal, start typing and press enter!\n")
while True:
await sock.send_all(b"-> ")
data = await sock.receive(1024)
if not data:
break
elif data == b"exit\n":
await sock.send_all(b"I'm dead dude\n")
raise TypeError("Oh, no, I'm gonna die!")
logging.info(f"Got: {data!r} from {address}")
for i, client_sock in enumerate(clients):
logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}")
if client_sock != sock:
await client_sock.send_all(data)
logging.info(f"Echoed back {data!r} to {i} clients")
logging.info(f"Connection from {address} closed")
if __name__ == "__main__":
port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501
logging.basicConfig(
level=20,
format="[%(levelname)s] %(asctime)s %(message)s",
datefmt="%d/%m/%Y %p",
)
try:
giambio.run(serve, ("localhost", port))
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
logging.info("Ctrl+C detected, exiting")
else:
logging.error(f"Exiting due to a {type(error).__name__}: {error}")

31
tests/queue.py Normal file
View File

@ -0,0 +1,31 @@
import giambio
from debugger import Debugger
async def producer(q: giambio.Queue, n: int):
for i in range(n):
await q.put(i)
print(f"Produced {i}")
await q.put(None)
print("Producer done")
async def consumer(q: giambio.Queue):
while True:
item = await q.get()
if item is None:
print("Consumer done")
break
print(f"Consumed {item}")
await giambio.sleep(3)
async def main(q: giambio.Queue, n: int):
async with giambio.create_pool() as pool:
await pool.spawn(producer, q, n)
await pool.spawn(consumer, q)
queue = giambio.Queue(1)
giambio.run(main, queue, 5, debugger=())

View File

@ -32,7 +32,11 @@ async def main():
try:
async with giambio.create_pool() as pool:
# This pool will run until completion of its
# tasks and then propagate the exception
# tasks and then propagate the exception. This is
# because exception in nested pools are propagated
# all the way down first, then the pools above the
# one that raised the error first wait for their
# children to complete and only then re-raise the original exception
await pool.spawn(child)
await pool.spawn(child)
print("[main] First 2 children spawned, awaiting completion")