Initial highly experimental work on a thread layer

This commit is contained in:
Mattia Giambirtone 2023-05-18 00:06:21 +02:00
parent ac69afd2db
commit 6d0ac9d27d
Signed by: nocturn9x
GPG Key ID: 8270F9F467971E59
5 changed files with 127 additions and 6 deletions

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (structio)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (StructuredIO)" project-jdk-type="Python SDK" />
</project>

View File

@ -11,6 +11,7 @@ from structio.core import task
from structio.core.task import Task, TaskState
from structio.sync import Event, Queue, MemoryChannel, Semaphore, Lock, RLock
from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter
from structio import thread
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
@ -109,5 +110,6 @@ __all__ = ["run",
"TaskPool",
"ResourceClosed",
"Lock",
"RLock"
"RLock",
"thread"
]

View File

@ -23,7 +23,7 @@ class Event:
"""
self._set = False
self.waiters: deque[Task] = deque()
self._tasks: deque[Task] = deque()
def is_set(self):
return self._set
@ -39,7 +39,7 @@ class Event:
if self.is_set():
await checkpoint()
return
self.waiters.append(current_task())
self._tasks.append(current_task())
await suspend() # We get re-scheduled by set()
@enable_ki_protection
@ -52,9 +52,9 @@ class Event:
if self.is_set():
raise RuntimeError("the event has already been set")
self._set = True
for waiter in self.waiters:
for waiter in self._tasks:
current_loop().reschedule(waiter)
self.waiters.clear()
self._tasks.clear()
class Queue:

91
structio/thread.py Normal file
View File

@ -0,0 +1,91 @@
# Support module for running synchronous functions as
# coroutines into worker threads and to submit asynchronous
# work to the event loop from a synchronous thread
import threading
from typing import Callable, Any, Coroutine
from collections import deque
from structio.sync import Event, Semaphore
from structio.util.ki import enable_ki_protection
_storage = threading.local()
# Max number of concurrent threads that can
# be spawned by run_in_worker before blocking
_storage.max_workers = Semaphore(50)
class AsyncThreadEvent(Event):
"""
An extension of the regular event
class that is safe to utilize both
from threads and from async code
"""
def __init__(self):
super().__init__()
self._lock = threading.Lock()
self._workers: deque[threading.Event] = deque()
@enable_ki_protection
def wait_sync(self):
"""
Like wait(), but synchronous
"""
with self._lock:
if self.is_set():
return
ev = threading.Event()
self._workers.append(ev)
ev.wait()
@enable_ki_protection
async def wait(self):
with self._lock:
if self.is_set():
return
await super().wait()
@enable_ki_protection
def set(self):
with self._lock:
if self.is_set():
return
# Awakes all coroutines
super().set()
# Awakes all threads
for evt in self._workers:
evt.set()
async def run_in_worker(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
*args,
**kwargs,
):
async with _storage.max_workers:
# This will automatically block once
# we run out of slots and proceed once
# we have more
pass # TODO
def set_max_worker_count(count: int):
"""
Sets a new value for the maximum number of concurrent
worker threads structio is allowed to spawn
"""
# Everything, to avoid the unholy "global"
_storage.max_workers = Semaphore(count)
def get_max_worker_count() -> int:
"""
Gets the maximum number of concurrent worker
threads structio is allowed to spawn
"""
return _storage.max_workers.max_size

View File

@ -1,4 +1,6 @@
import structio
import time
import threading
async def child(ev: structio.Event, n):
@ -22,4 +24,30 @@ async def main(i):
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
def thread_worker(ev: structio.thread.AsyncThreadEvent):
print("[worker] Worker thread spawned, waiting for event")
t = time.time()
ev.wait_sync()
print(f"[worker] Event was fired after {time.time() - t:.2f} seconds")
async def main_async_thread(i):
print("[main] Parent is alive")
j = structio.clock()
async with structio.create_pool() as pool:
# Identical to structio.Event, but this event
# can talk to threads too
evt = structio.thread.AsyncThreadEvent()
th = threading.Thread(target=thread_worker, args=(evt,))
th.start()
print("[main] Spawning child")
pool.spawn(child, evt, i)
print("[main] Child spawned, waiting on the event")
await evt.wait()
assert evt.is_set()
th.join()
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
structio.run(main, 5)
structio.run(main_async_thread, 5)