From b64707271a735b892bafe0aa0f1e3a44e0d18cc5 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Fri, 19 May 2023 15:43:55 +0200 Subject: [PATCH] Added a bunch of comments and docs and other things --- README.md | 15 +++- structio/core/run.py | 13 +++- structio/thread.py | 169 ++++++++++++++++++++++++++++++------------- 3 files changed, 143 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 2fabd40..b2d5ef8 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,16 @@ # structio -A proof of concept for an experimental structured concurrency framework written in Python \ No newline at end of file +A proof of concept for an experimental structured concurrency framework written in Python + +## Disclaimer + +This library is highly experimental and currently in alpha stage (it doesn't even have a proper version +number yet, that's how alpha it is), so it's not production ready (and probably never will be). If you +want the fancy structured concurrency paradigm in a library that works today, consider [trio](https://trio.readthedocs.org), +from which structio is heavily inspired ([curio](https://github.com/dabeaz/curio) is also worth looking into, although +technically it doesn't implement SC). + +## Why? + +This library (and [its](https://git.nocturn9x.space/nocturn9x/giambio) [predecessors](https://git.nocturn9x.space/nocturn9x/aiosched)) is just a way for me to test my knowledge and make sure I understand the basics of structured concurrency +and building solid coroutine runners so that I can implement the paradigm in my own programming language. For more info, see [here](https://git.nocturn9x.space/nocturn9x/peon). \ No newline at end of file diff --git a/structio/core/run.py b/structio/core/run.py index b4b6825..f5a7222 100644 --- a/structio/core/run.py +++ b/structio/core/run.py @@ -10,6 +10,11 @@ _RUN = local() def current_loop() -> BaseKernel: + """ + Returns the current event loop in the calling + thread. Raises a StructIOException if no async + context exists + """ try: return _RUN.kernel except AttributeError: @@ -17,6 +22,10 @@ def current_loop() -> BaseKernel: def current_task() -> Task: + """ + Shorthand for current_loop().current_task + """ + return current_loop().current_task @@ -33,7 +42,7 @@ def new_event_loop(kernel: BaseKernel): _RUN.kernel = kernel else: if not _RUN.kernel.done(): - raise StructIOException("cannot be called from async context") from None + raise StructIOException("cannot be called from running async context") from None def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], @@ -49,7 +58,7 @@ def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], """ if not issubclass(kernel, BaseKernel): - raise TypeError(f"kernel must be a subclass of structio.core.abc.BaseKernel!") + raise TypeError(f"kernel must be a subclass of {BaseKernel.__module__}.{BaseKernel.__qualname__}!") check = func if isinstance(func, functools.partial): check = func.func diff --git a/structio/thread.py b/structio/thread.py index 6b888c2..10728f6 100644 --- a/structio/thread.py +++ b/structio/thread.py @@ -1,18 +1,18 @@ # Support module for running synchronous functions as # coroutines into worker threads and to submit asynchronous # work to the event loop from a synchronous thread +import structio import threading from collections import deque - -import structio -from structio.sync import Event, Semaphore, Queue -from structio.util.ki import enable_ki_protection -from structio.core.syscalls import checkpoint from structio.abc import BaseKernel from structio.core.run import current_loop from typing import Callable, Any, Coroutine +from structio.core.syscalls import checkpoint +from structio.sync import Event, Semaphore, Queue +from structio.util.ki import enable_ki_protection from structio.exceptions import StructIOException + _storage = threading.local() # Max number of concurrent threads that can # be spawned by run_in_worker before blocking @@ -145,66 +145,108 @@ class AsyncThreadQueue(Queue): return self.container.popleft() -def _threaded_runner(f, q: AsyncThreadQueue, parent_loop: BaseKernel, rq: AsyncThreadQueue, - rsq: AsyncThreadQueue, evt: AsyncThreadEvent, *args): +# Just a bunch of private helpers to run sync/async functions + +def _threaded_runner(f, parent_loop: BaseKernel, rq: AsyncThreadQueue, rsq: AsyncThreadQueue, evt: AsyncThreadEvent, *args): try: + # Setup thread-local storage so future calls + # to run_coro() can find this stuff _storage.parent_loop = parent_loop _storage.rq = rq _storage.rsq = rsq - q.put_sync((True, f(*args))) + result = f(*args) except BaseException as e: - q.put_sync((False, e)) + rsq.put_sync((False, e)) + else: + rsq.put_sync((True, result)) finally: + # Notify the event loop that the thread + # has exited evt.set() @enable_ki_protection -async def _async_waiter(events, results: AsyncThreadQueue): +async def _coroutine_request_handler(events: AsyncThreadQueue, results: AsyncThreadQueue): + """ + Runs coroutines on behalf of a thread spawned by structio and + submits the outcome back to the thread + """ + while True: data = await events.get() if not data: break coro = data try: - await results.put((True, await coro)) + result = await coro except BaseException as e: await results.put((False, e)) + else: + await results.put((True, result)) @enable_ki_protection -async def _wait_for_thread(events, results: AsyncThreadQueue, evt: AsyncThreadEvent, cancellable: bool = False): +async def _wait_for_thread(events: AsyncThreadQueue, results: AsyncThreadQueue, + termination_event: AsyncThreadEvent, cancellable: bool = False): + """ + Waits for a thread spawned by structio to complete and + returns its result. Exceptions are also propagated + """ + async with structio.create_pool() as pool: + # If the operation is cancellable, then we're not + # shielded pool.scope.shielded = not cancellable # Spawn a coroutine to process incoming requests from - # the new async thread - pool.spawn(_async_waiter, events, results) + # the new async thread. We can't await it because it + # needs to run in the background + pool.spawn(_coroutine_request_handler, events, results) # Wait for the thread to terminate - await evt.wait() - # Worker thread has exited: we no longer need to process any - # requests, so we shut our waiter down + await termination_event.wait() + # Worker thread has exited: we no longer need to process + # any requests, so we shut our request handler down await events.put(None) - - -@enable_ki_protection -async def _async_runner(f, cancellable: bool = False, *args): - # Thread termination event - evt = AsyncThreadEvent() - - queue = AsyncThreadQueue(1) - # Request queue - rq = AsyncThreadQueue(0) - # Results queue - rsq = AsyncThreadQueue(0) - current_loop().current_pool.spawn(_wait_for_thread, rq, rsq, evt, cancellable) - th = threading.Thread(target=_threaded_runner, args=(f, queue, current_loop(), rq, rsq, evt, *args), - name="structio-worker-thread", daemon=cancellable) - th.start() - success, data = await queue.get() + # Wait for the final result from the thread + success, data = await results.get() if success: return data raise data +@enable_ki_protection +async def _spawn_supervised_thread(f, cancellable: bool = False, *args): + # Thread termination event + terminate = AsyncThreadEvent() + # Request queue. This is where the thread + # sends coroutines to run + rq = AsyncThreadQueue(0) + # Results queue. This is where we put the result + # of the coroutines in the request queue + rsq = AsyncThreadQueue(0) + # This looks like a lot of bookkeeping to do synchronization, but it all has a purpose. + # The termination event is necessary so that _wait_for_thread can know when to shut + # down (and, by extension, shut down its workers too). The request and result queues + # are used to send coroutines and their results back and forth when using run_coro from + # within the "asynchronous thread". Trying to reduce the amount of primitives turns out + # to be very hard, because we'd have at least 3 different things (_wait_for_thread, + # _threaded_runner and _coroutine_request_handler) trying to work on the same resources, which is + # a hellish nightmare to synchronize properly. For example, _coroutine_request_handler *could* just + # use a single queue for sending data back and forth, but since it runs in a while loop in order to + # handle more than one request, as soon as it would put any data onto the queue and then go to the + # next iteration in the loop, it would (likely, but not always, as it depends on how things get + # scheduled) immediately call get() again, get something out of queue that it doesn't expect and + # crash horribly. So this separation is necessary to retain my sanity + threading.Thread(target=_threaded_runner, args=(f, current_loop(), rq, rsq, terminate, *args), + # We start cancellable threads in daemonic mode so that + # the main thread doesn't get stuck waiting on them forever + # when their associated async counterpart gets cancelled. This + # is due to the fact that there's really no way to "kill" a thread + # (and for good reason!), so we just pretend nothing happened and go + # about our merry way, hoping the thread dies eventually I guess + name="structio-worker-thread", daemon=cancellable).start() + return await _wait_for_thread(rq, rsq, terminate, cancellable) + + @enable_ki_protection async def run_in_worker(sync_func, *args, @@ -213,19 +255,38 @@ async def run_in_worker(sync_func, """ Call the given synchronous function in a separate worker thread, turning it into an async operation. - The result of the call is returned, and any exceptions - are propagated back to the caller. If cancellable equals - False, the default, then the operation cannot be canceled - in any way. If cancellable equals True, cancellation will - cause this function to return early and to abruptly drop - the thread: keep in mind the thread is likely to keep running - in the background, as structio doesn't make any effort to stop - it (it can't). If you call this with cancellable=True, make sure - the operation you're performing is side-effect-free (for example, - the async version of getaddrinfo() uses run_in_worker with cancellable=True - to avoid hogging the event loop when doing domain name resolution but still - be able to fail properly, since no one really cares if a random DNS lookup - keeps running in the background) + Must be called from an asynchronous context (a + StructIOException is raised otherwise). The result + of the call is returned, and any exceptions that occur + are propagated back to the caller. This is semantically + identical to just calling the function itself from within + the async context, but it has the added benefit of 1) Being + partially cancellable (with a catch, see below) and 2) If + the function performs some long-running blocking operation, + calling it in the main thread is not advisable, as it would + cause structio's event loop to grind to a halt, meaning that + timeouts and cancellations don't work, I/O doesn't get scheduled, + and all sorts of nasty things happen (or rather, don't happen, + since no work is getting done). In short, don't do long-running + sync calls in the main thread, use a worker. Also, don't do any + CPU-bound work in it, or you're likely to negatively affect the main + thread anyway because CPython is weird and likes to starve-out I/O + bound threads if there's some CPU-bound workers running (for that kind + of work, you might want to spawn an entire separate process instead). + Now, onto cancellations: If cancellable equals False, then the operation + cannot be canceled in any way (this is the default option). This means + that even if you set a task scope with a timeout or explicitly cancel + the pool where this function is awaited, its effects won't be visible + until after the thread has exited. If cancellable equals True, cancellation + will cause this function to return early and to abruptly drop the thread: + keep in mind that it is likely to keep running in the background, as + structio doesn't make any effort to stop it (it can't). If you call this + with cancellable=True, make sure the operation you're performing is side-effect-free, + or you might get nasty deadlocks or race conditions happening. + + Note: If the number of current active thread workers is equal to the value of get_max_worker_count(), + this function blocks until a slot is available and then proceeds normally. + """ if not hasattr(_storage, "parent_loop"): @@ -239,7 +300,13 @@ async def run_in_worker(sync_func, # we run out of slots and proceed once # we have more async with _storage.max_workers: - return await current_loop().current_pool.spawn(_async_runner, sync_func, cancellable, *args) + # We do a little magic trick and inject the "async thread" as a + # task in the current task pool (keep in mind structio is always + # within some task pool, even if you don't see one explicitly. The + # event loop has its own secret "root" task pool which is a parent to all + # others and where the call to structio.run() as well as any other system + # task run) + return await current_loop().current_pool.spawn(_spawn_supervised_thread, sync_func, cancellable, *args) @enable_ki_protection @@ -247,9 +314,9 @@ def run_coro(async_func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs): """ Submits a coroutine for execution to the event loop, passing any - arguments along the way. Return values and exceptions - are propagated and from the point of view of the calling thread, - this call blocks until the coroutine returns + arguments along the way. Return values and exceptions are propagated + and from the point of view of the calling thread, this call blocks + until the coroutine returns """ try: