From a0d376bb35eddfcebad7f79d6a15f06bebf155cb Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Fri, 4 Feb 2022 20:19:48 +0100 Subject: [PATCH] Added chatroom example and initial work on (semi-broken) async Queue implementation --- giambio/__init__.py | 8 ++- giambio/core.py | 47 +++++++++-------- giambio/io.py | 26 ++++++++++ giambio/sync.py | 55 ++++++++++++++++---- giambio/task.py | 27 +++++----- giambio/traps.py | 10 +++- tests/chatroom.py | 80 +++++++++++++++++++++++++++++ tests/{server.py => echo_server.py} | 0 tests/queue.py | 31 +++++++++++ tests/triple_nested_exception.py | 6 ++- 10 files changed, 239 insertions(+), 51 deletions(-) create mode 100644 tests/chatroom.py rename tests/{server.py => echo_server.py} (100%) create mode 100644 tests/queue.py diff --git a/giambio/__init__.py b/giambio/__init__.py index 0dc5923..0ced5c2 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -20,9 +20,9 @@ __author__ = "Nocturn9x" __version__ = (0, 0, 1) -from . import exceptions, socket, context, core, task, io +from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event +from giambio.sync import Event, Queue from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -33,6 +33,7 @@ __all__ = [ "context", "sleep", "Event", + "Queue", "run", "clock", "create_pool", @@ -42,4 +43,7 @@ __all__ = [ "new_event_loop", "debug", "skip_after", + "task", + "io", + "socket" ] diff --git a/giambio/core.py b/giambio/core.py index 7023ce4..de77088 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -235,7 +235,6 @@ class AsyncScheduler: self.current_task.result = ret.value self.current_task.finished = True self.join(self.current_task) - self.tasks.remove(self.current_task) except BaseException as err: # Our handy join mechanism will handle all the hassle of # rescheduling joiners and propagating errors, so we @@ -243,8 +242,7 @@ class AsyncScheduler: # self.join() work its magic self.current_task.exc = err self.join(self.current_task) - if self.current_task in self.tasks: - self.tasks.remove(self.current_task) + def create_task(self, corofunc: types.FunctionType, pool, *args, **kwargs) -> Task: """ @@ -344,19 +342,19 @@ class AsyncScheduler: self.io_release(k.fileobj) task.last_io = () - def suspend(self, task: Task): + def suspend(self): """ - Suspends execution of the given task. This is basically + Suspends execution of the current task. This is basically a do-nothing method, since it will not reschedule the task before returning. The task will stay suspended as long as something else outside the loop calls a trap to reschedule it. - - This method will unregister any I/O as well to ensure the task - isn't rescheduled in further calls to select() + Any pending I/O for the task is temporarily unscheduled to + avoid some previous network operation to reschedule the task + before it's due """ - - if task.last_io: - self.io_release_task(task) + + if self.current_task.last_io: + self.io_release_task(self.current_task) def reschedule_running(self): """ @@ -450,6 +448,8 @@ class AsyncScheduler: :param tasks: The list of task objects to schedule """ + for task in tasks: + self.paused.discard(task) self.run_ready.extend(tasks) self.reschedule_running() @@ -639,11 +639,11 @@ class AsyncScheduler: self.debugger.on_task_exit(task) if task.last_io: self.io_release_task(task) + # If the pool has finished executing or we're at the first parent + # task that kicked the loop, we can safely reschedule the parent(s) if task.pool is None: return if task.pool.done(): - # If the pool has finished executing or we're at the first parent - # task that kicked the loop, we can safely reschedule the parent(s) self.reschedule_joiners(task) elif task.exc: task.status = "crashed" @@ -651,13 +651,13 @@ class AsyncScheduler: # TODO: We might want to do a bit more complex traceback hacking to remove any extra # frames from the exception call stack, but for now removing at least the first one # seems a sensible approach (it's us catching it so we don't care about that) - task.exc.__traceback__ = task.exc.__traceback__.tb_next - if task.last_io: - self.io_release_task(task) + for _ in range(5): + if task.exc.__traceback__.tb_next: + task.exc.__traceback__ = task.exc.__traceback__.tb_next self.debugger.on_exception_raised(task, task.exc) - if task.pool is None: + if task.pool is None or task is self.entry_point: # Parent task has no pool, so we propagate - raise + raise task.exc if self.cancel_pool(task.pool): # This will reschedule the parent(s) # only if all the tasks inside the task's @@ -671,6 +671,9 @@ class AsyncScheduler: except (StopIteration, CancelledError, RuntimeError): # TODO: Need anything else? task.joiners.remove(t) + finally: + if t in self.tasks: + self.tasks.remove(t) self.reschedule_joiners(task) def sleep(self, seconds: int or float): @@ -678,8 +681,8 @@ class AsyncScheduler: Puts the current task to sleep for a given amount of seconds """ - self.debugger.before_sleep(self.current_task, seconds) if seconds: + self.debugger.before_sleep(self.current_task, seconds) self.current_task.status = "sleep" self.current_task.sleep_start = self.clock() self.paused.put(self.current_task, seconds) @@ -703,7 +706,7 @@ class AsyncScheduler: # The task isn't running already! task.cancel_pending = False return - elif task.status in ("io", "sleep"): + elif task.status in ("io", "sleep", "run"): # We cancel immediately only in a context where it's safer to do # so. The concept of "safer" is quite tricky, because even though the # task is technically not running, it might leave some unfinished state @@ -736,10 +739,6 @@ class AsyncScheduler: # If the task ignores our exception, we'll # raise it later again task.cancel_pending = True - else: - # If we can't cancel in a somewhat "graceful" way, we just - # defer this operation for later (check run() for more info) - task.cancel_pending = True # Cancellation is deferred def register_sock(self, sock, evt_type: str): """ diff --git a/giambio/io.py b/giambio/io.py index 8653a72..01a853a 100644 --- a/giambio/io.py +++ b/giambio/io.py @@ -242,6 +242,32 @@ class AsyncSocket: await want_write(self.sock) except WantRead: await want_read(self.sock) + + async def getpeername(self): + """ + Wrapper socket method + """ + + while True: + try: + return self.sock.getpeername() + except WantWrite: + await want_write(self.sock) + except WantRead: + await want_read(self.sock) + + async def getsockname(self): + """ + Wrapper socket method + """ + + while True: + try: + return self.sock.getpeername() + except WantWrite: + await want_write(self.sock) + except WantRead: + await want_read(self.sock) async def recvmsg(self, bufsize, ancbufsize=0, flags=0): """ diff --git a/giambio/sync.py b/giambio/sync.py index ae6582b..2bb8d2e 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,14 +15,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from typing import Any -from giambio.traps import event_wait, event_set +from collections import deque +from typing import Any, Optional +from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop from giambio.exceptions import GiambioError class Event: """ - A class designed similarly to threading.Event + A class designed similarly to threading.Event (not + thread-safe though) """ def __init__(self): @@ -53,20 +55,55 @@ class Event: class Queue: """ - An asynchronous queue similar to asyncio.Queue. - NOT thread safe! + An asynchronous FIFO queue similar to asyncio.Queue + that uses a collections.deque object for the underlying + data representation. This queue is *NOT* thread-safe + """ - def __init__(self, maxsize: int): + def __init__(self, maxsize: Optional[int] = None): """ Object constructor """ - self.events = {} - self.container = [] + self.maxsize = maxsize + self.getters = deque() + self.putters = deque() + self.container = deque(maxlen=maxsize) async def put(self, item: Any): """ - + Pushes an element onto the queue. If the + queue is full, waits until there's + enough space for the queue """ + + if not self.maxsize or len(self.container) < self.maxsize: + if self.getters: + task = self.getters.popleft() + loop = await current_loop() + loop._data[task] = item + await schedule_tasks([task]) + else: + self.container.append(item) + else: + self.putters.append(await current_task()) + print(self.putters) + await suspend() + + + async def get(self) -> Any: + """ + Pops an element off the queue. Blocks until + an element is put onto it again if the queue + is empty + """ + + if self.container: + if self.putters: + await schedule_tasks([self.putters.popleft()]) + return self.container.popleft() + else: + self.getters.append(await current_task()) + return await suspend() \ No newline at end of file diff --git a/giambio/task.py b/giambio/task.py index 804c802..899fc42 100644 --- a/giambio/task.py +++ b/giambio/task.py @@ -30,16 +30,18 @@ class Task: """ # The name of the task. Usually this equals self.coroutine.__name__, - # but in some cases it falls back to repr(self.coroutine) + # but it may fall back to repr(self.coroutine) name: str # The underlying coroutine object to wrap around a giambio task coroutine: Coroutine - # The async pool that spawned this task. The one and only task that hasn't - # an associated pool is the main entry point which is not available externally + # The async pool that spawned this task. The one and only task which may have + # no associated pool is the main entry point which is not available externally + # (but if a pool is started in the main task, it somewhat becomes part of that + # pool as its parent) pool: Union["giambio.context.TaskManager", None] = None # Whether the task has been cancelled or not. This is True both when the task is # explicitly cancelled via its cancel() method or when it is cancelled as a result - # of an exception in another task in the same pool + # of an exception in another task cancelled: bool = False # This attribute will be None unless the task raised an error exc: BaseException = None @@ -51,10 +53,10 @@ class Task: # time by the event loop, internally. Possible values for this are "init"-- # when the task has been created but not started running yet--, "run"-- when # the task is running synchronous code--, "io"-- when the task is waiting on - # an I/O resource--, "sleep"-- when the task is either asleep or waiting on - # an event, "crashed"-- when the task has exited because of an exception - # and "cancelled" when-- when the task has been explicitly cancelled with - # its cancel() method or as a result of an exception + # an I/O resource--, "sleep"-- when the task is either asleep, waiting on + # an event or otherwise suspended, "crashed"-- when the task has exited because + # of an exception and "cancelled" when-- when the task has been explicitly cancelled + # with its cancel() method or as a result of an exception status: str = "init" # This attribute counts how many times the task's run() method has been called steps: int = 0 @@ -106,13 +108,10 @@ class Task: are propagated as well """ - task = await giambio.traps.current_task() - if task: + if task := await giambio.traps.current_task(): self.joiners.add(task) - res = await giambio.traps.join(self) - if self.exc: - raise self.exc - return res + return await giambio.traps.join(self) + async def cancel(self): """ diff --git a/giambio/traps.py b/giambio/traps.py index 44f906b..fc81665 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -25,7 +25,7 @@ import types import inspect from giambio.task import Task from types import FunctionType -from typing import Union, Iterable +from typing import Any, Union, Iterable from giambio.exceptions import GiambioError @@ -41,6 +41,14 @@ def create_trap(method, *args): return data +async def suspend() -> Any: + """ + Suspends the current task + """ + + return await create_trap("suspend") + + async def create_task(coro: FunctionType, pool, *args): """ Spawns a new task in the current event loop from a bare coroutine diff --git a/tests/chatroom.py b/tests/chatroom.py new file mode 100644 index 0000000..5aec80a --- /dev/null +++ b/tests/chatroom.py @@ -0,0 +1,80 @@ +from typing import List +import giambio +from giambio.socket import AsyncSocket +import logging +import sys + +# An asynchronous chatroom + +clients: List[giambio.socket.AsyncSocket] = [] + + +async def serve(bind_address: tuple): + """ + Serves asynchronously forever + + :param bind_address: The address to bind the server to represented as a tuple + (address, port) where address is a string and port is an integer + """ + + sock = giambio.socket.socket() + await sock.bind(bind_address) + await sock.listen(5) + logging.info(f"Serving asynchronously at {bind_address[0]}:{bind_address[1]}") + async with giambio.create_pool() as pool: + async with sock: + while True: + try: + conn, address_tuple = await sock.accept() + clients.append(conn) + logging.info(f"{address_tuple[0]}:{address_tuple[1]} connected") + await pool.spawn(handler, conn, address_tuple) + except Exception as err: + # Because exceptions just *work* + logging.info(f"{address_tuple[0]}:{address_tuple[1]} has raised {type(err).__name__}: {err}") + + +async def handler(sock: AsyncSocket, client_address: tuple): + """ + Handles a single client connection + + :param sock: The AsyncSocket object connected to the client + :param client_address: The client's address represented as a tuple + (address, port) where address is a string and port is an integer + :type client_address: tuple + """ + + address = f"{client_address[0]}:{client_address[1]}" + async with sock: # Closes the socket automatically + await sock.send_all(b"Welcome to the chartoom pal, start typing and press enter!\n") + while True: + await sock.send_all(b"-> ") + data = await sock.receive(1024) + if not data: + break + elif data == b"exit\n": + await sock.send_all(b"I'm dead dude\n") + raise TypeError("Oh, no, I'm gonna die!") + logging.info(f"Got: {data!r} from {address}") + for i, client_sock in enumerate(clients): + logging.info(f"Sending {data!r} to {':'.join(map(str, await client_sock.getpeername()))}") + if client_sock != sock: + await client_sock.send_all(data) + logging.info(f"Echoed back {data!r} to {i} clients") + logging.info(f"Connection from {address} closed") + + +if __name__ == "__main__": + port = int(sys.argv[1]) if len(sys.argv) > 1 else 1501 + logging.basicConfig( + level=20, + format="[%(levelname)s] %(asctime)s %(message)s", + datefmt="%d/%m/%Y %p", + ) + try: + giambio.run(serve, ("localhost", port)) + except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + if isinstance(error, KeyboardInterrupt): + logging.info("Ctrl+C detected, exiting") + else: + logging.error(f"Exiting due to a {type(error).__name__}: {error}") diff --git a/tests/server.py b/tests/echo_server.py similarity index 100% rename from tests/server.py rename to tests/echo_server.py diff --git a/tests/queue.py b/tests/queue.py new file mode 100644 index 0000000..627ffc9 --- /dev/null +++ b/tests/queue.py @@ -0,0 +1,31 @@ +import giambio +from debugger import Debugger + + +async def producer(q: giambio.Queue, n: int): + for i in range(n): + await q.put(i) + print(f"Produced {i}") + await q.put(None) + print("Producer done") + + +async def consumer(q: giambio.Queue): + while True: + item = await q.get() + if item is None: + print("Consumer done") + break + print(f"Consumed {item}") + await giambio.sleep(3) + + +async def main(q: giambio.Queue, n: int): + async with giambio.create_pool() as pool: + await pool.spawn(producer, q, n) + await pool.spawn(consumer, q) + + + +queue = giambio.Queue(1) +giambio.run(main, queue, 5, debugger=()) diff --git a/tests/triple_nested_exception.py b/tests/triple_nested_exception.py index 7e292e7..8862abd 100644 --- a/tests/triple_nested_exception.py +++ b/tests/triple_nested_exception.py @@ -32,7 +32,11 @@ async def main(): try: async with giambio.create_pool() as pool: # This pool will run until completion of its - # tasks and then propagate the exception + # tasks and then propagate the exception. This is + # because exception in nested pools are propagated + # all the way down first, then the pools above the + # one that raised the error first wait for their + # children to complete and only then re-raise the original exception await pool.spawn(child) await pool.spawn(child) print("[main] First 2 children spawned, awaiting completion")