Initial highly experimental work on a thread layer
This commit is contained in:
parent
f5ec5beab3
commit
1b4193ce79
|
@ -1,4 +1,4 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project version="4">
|
<project version="4">
|
||||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (structio)" project-jdk-type="Python SDK" />
|
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (StructuredIO)" project-jdk-type="Python SDK" />
|
||||||
</project>
|
</project>
|
|
@ -11,6 +11,7 @@ from structio.core import task
|
||||||
from structio.core.task import Task, TaskState
|
from structio.core.task import Task, TaskState
|
||||||
from structio.sync import Event, Queue, MemoryChannel, Semaphore, Lock, RLock
|
from structio.sync import Event, Queue, MemoryChannel, Semaphore, Lock, RLock
|
||||||
from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter
|
from structio.core.abc import Channel, Stream, ChannelReader, ChannelWriter
|
||||||
|
from structio import thread
|
||||||
|
|
||||||
|
|
||||||
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
|
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
|
||||||
|
@ -109,5 +110,6 @@ __all__ = ["run",
|
||||||
"TaskPool",
|
"TaskPool",
|
||||||
"ResourceClosed",
|
"ResourceClosed",
|
||||||
"Lock",
|
"Lock",
|
||||||
"RLock"
|
"RLock",
|
||||||
|
"thread"
|
||||||
]
|
]
|
||||||
|
|
|
@ -23,7 +23,7 @@ class Event:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._set = False
|
self._set = False
|
||||||
self.waiters: deque[Task] = deque()
|
self._tasks: deque[Task] = deque()
|
||||||
|
|
||||||
def is_set(self):
|
def is_set(self):
|
||||||
return self._set
|
return self._set
|
||||||
|
@ -39,7 +39,7 @@ class Event:
|
||||||
if self.is_set():
|
if self.is_set():
|
||||||
await checkpoint()
|
await checkpoint()
|
||||||
return
|
return
|
||||||
self.waiters.append(current_task())
|
self._tasks.append(current_task())
|
||||||
await suspend() # We get re-scheduled by set()
|
await suspend() # We get re-scheduled by set()
|
||||||
|
|
||||||
@enable_ki_protection
|
@enable_ki_protection
|
||||||
|
@ -52,9 +52,9 @@ class Event:
|
||||||
if self.is_set():
|
if self.is_set():
|
||||||
raise RuntimeError("the event has already been set")
|
raise RuntimeError("the event has already been set")
|
||||||
self._set = True
|
self._set = True
|
||||||
for waiter in self.waiters:
|
for waiter in self._tasks:
|
||||||
current_loop().reschedule(waiter)
|
current_loop().reschedule(waiter)
|
||||||
self.waiters.clear()
|
self._tasks.clear()
|
||||||
|
|
||||||
|
|
||||||
class Queue:
|
class Queue:
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
# Support module for running synchronous functions as
|
||||||
|
# coroutines into worker threads and to submit asynchronous
|
||||||
|
# work to the event loop from a synchronous thread
|
||||||
|
import threading
|
||||||
|
from typing import Callable, Any, Coroutine
|
||||||
|
from collections import deque
|
||||||
|
from structio.sync import Event, Semaphore
|
||||||
|
from structio.util.ki import enable_ki_protection
|
||||||
|
|
||||||
|
|
||||||
|
_storage = threading.local()
|
||||||
|
# Max number of concurrent threads that can
|
||||||
|
# be spawned by run_in_worker before blocking
|
||||||
|
_storage.max_workers = Semaphore(50)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncThreadEvent(Event):
|
||||||
|
"""
|
||||||
|
An extension of the regular event
|
||||||
|
class that is safe to utilize both
|
||||||
|
from threads and from async code
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._workers: deque[threading.Event] = deque()
|
||||||
|
|
||||||
|
@enable_ki_protection
|
||||||
|
def wait_sync(self):
|
||||||
|
"""
|
||||||
|
Like wait(), but synchronous
|
||||||
|
"""
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if self.is_set():
|
||||||
|
return
|
||||||
|
ev = threading.Event()
|
||||||
|
self._workers.append(ev)
|
||||||
|
ev.wait()
|
||||||
|
|
||||||
|
@enable_ki_protection
|
||||||
|
async def wait(self):
|
||||||
|
with self._lock:
|
||||||
|
if self.is_set():
|
||||||
|
return
|
||||||
|
await super().wait()
|
||||||
|
|
||||||
|
@enable_ki_protection
|
||||||
|
def set(self):
|
||||||
|
with self._lock:
|
||||||
|
if self.is_set():
|
||||||
|
return
|
||||||
|
# Awakes all coroutines
|
||||||
|
super().set()
|
||||||
|
# Awakes all threads
|
||||||
|
for evt in self._workers:
|
||||||
|
evt.set()
|
||||||
|
|
||||||
|
|
||||||
|
async def run_in_worker(func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
async with _storage.max_workers:
|
||||||
|
# This will automatically block once
|
||||||
|
# we run out of slots and proceed once
|
||||||
|
# we have more
|
||||||
|
pass # TODO
|
||||||
|
|
||||||
|
|
||||||
|
def set_max_worker_count(count: int):
|
||||||
|
"""
|
||||||
|
Sets a new value for the maximum number of concurrent
|
||||||
|
worker threads structio is allowed to spawn
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Everything, to avoid the unholy "global"
|
||||||
|
_storage.max_workers = Semaphore(count)
|
||||||
|
|
||||||
|
|
||||||
|
def get_max_worker_count() -> int:
|
||||||
|
"""
|
||||||
|
Gets the maximum number of concurrent worker
|
||||||
|
threads structio is allowed to spawn
|
||||||
|
"""
|
||||||
|
|
||||||
|
return _storage.max_workers.max_size
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
import structio
|
import structio
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
async def child(ev: structio.Event, n):
|
async def child(ev: structio.Event, n):
|
||||||
|
@ -22,4 +24,30 @@ async def main(i):
|
||||||
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
|
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
def thread_worker(ev: structio.thread.AsyncThreadEvent):
|
||||||
|
print("[worker] Worker thread spawned, waiting for event")
|
||||||
|
t = time.time()
|
||||||
|
ev.wait_sync()
|
||||||
|
print(f"[worker] Event was fired after {time.time() - t:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
|
async def main_async_thread(i):
|
||||||
|
print("[main] Parent is alive")
|
||||||
|
j = structio.clock()
|
||||||
|
async with structio.create_pool() as pool:
|
||||||
|
# Identical to structio.Event, but this event
|
||||||
|
# can talk to threads too
|
||||||
|
evt = structio.thread.AsyncThreadEvent()
|
||||||
|
th = threading.Thread(target=thread_worker, args=(evt,))
|
||||||
|
th.start()
|
||||||
|
print("[main] Spawning child")
|
||||||
|
pool.spawn(child, evt, i)
|
||||||
|
print("[main] Child spawned, waiting on the event")
|
||||||
|
await evt.wait()
|
||||||
|
assert evt.is_set()
|
||||||
|
th.join()
|
||||||
|
print(f"[main] Exited in {structio.clock() - j:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
structio.run(main, 5)
|
structio.run(main, 5)
|
||||||
|
structio.run(main_async_thread, 5)
|
||||||
|
|
Loading…
Reference in New Issue