Added Event class and reformatted with black

This commit is contained in:
Nocturn9x 2022-10-19 11:54:32 +02:00
parent 3021544e7f
commit c2bb63149b
14 changed files with 190 additions and 30 deletions

View File

@ -20,7 +20,7 @@ from aiosched.internals.syscalls import spawn, wait, sleep, cancel
import aiosched.task
import aiosched.errors
import aiosched.context
from aiosched.sync import Event
__all__ = [
"run",
@ -33,4 +33,5 @@ __all__ = [
"errors",
"cancel",
"with_context",
"Event",
]

View File

@ -17,7 +17,14 @@ limitations under the License.
"""
from aiosched.task import Task
from aiosched.errors import Cancelled
from aiosched.internals.syscalls import spawn, wait, cancel, set_context, close_context, join
from aiosched.internals.syscalls import (
spawn,
wait,
cancel,
set_context,
close_context,
join,
)
from typing import Any, Coroutine, Callable
@ -139,7 +146,10 @@ class TaskContext(Task):
continue
if not task.done():
return False
if not isinstance(self.entry_point, TaskContext) and not self.entry_point.done():
if (
not isinstance(self.entry_point, TaskContext)
and not self.entry_point.done()
):
return False
if self.inner:
return self.inner.done()

View File

@ -96,9 +96,7 @@ class ErrorStack(SchedulerError):
tracebacks = ""
for i, err in enumerate(self.errors):
if i not in (1, len(self.errors)):
tracebacks += (
f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n"
)
tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n"
else:
tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}"
return f"Multiple errors occurred:\n{tracebacks}"

View File

@ -43,6 +43,14 @@ def syscall(method: str, *args, **kwargs) -> Any | None:
return result
async def schedule(task: Task):
"""
Reschedules a task that had been
previously suspended
"""
await syscall("schedule", task)
async def spawn(func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> Task:
"""
Spawns a task from a coroutine and returns it. The coroutine

View File

@ -23,7 +23,13 @@ from timeit import default_timer
from aiosched.internals.queue import TimeQueue
from aiosched.util.debugging import BaseDebugger
from typing import Callable, Any, Coroutine
from aiosched.errors import InternalError, ResourceBusy, Cancelled, ResourceClosed, ResourceBroken
from aiosched.errors import (
InternalError,
ResourceBusy,
Cancelled,
ResourceClosed,
ResourceBroken,
)
from aiosched.context import TaskContext
from selectors import DefaultSelector, BaseSelector
@ -118,7 +124,9 @@ class FIFOKernel:
"""
if not self.done() and not force:
self.current_task.throw(InternalError("cannot shut down a running event loop"))
self.current_task.throw(
InternalError("cannot shut down a running event loop")
)
for task in self.all():
self.cancel(task)
@ -185,6 +193,15 @@ class FIFOKernel:
self.run_ready.append(self.current_task)
def schedule(self, task: Task):
"""
Schedules a task that was previously
suspended
"""
self.run_ready.append(task)
self.reschedule_running()
def suspend(self):
"""
Suspends execution of the current task. This is basically
@ -213,7 +230,7 @@ class FIFOKernel:
# We need to make sure we don't try to execute
# exited tasks that are on the running queue
if not self.run_ready:
return # No more tasks to run!
return # No more tasks to run!
self.current_task = self.run_ready.popleft()
self.debugger.before_task_step(self.current_task)
# Some debugging and internal chatter here
@ -226,15 +243,19 @@ class FIFOKernel:
else:
# Run a single step with the calculation (i.e. until a yield
# somewhere)
method, args, kwargs = self.current_task.run(self.data.pop(self.current_task, None))
method, args, kwargs = self.current_task.run(
self.data.pop(self.current_task, None)
)
if not hasattr(self, method) or not callable(getattr(self, method)):
# This if block is meant to be triggered by other async
# libraries, which most likely have different trap names and behaviors
# compared to us. If you get this exception, and you're 100% sure you're
# not mixing async primitives from other libraries, then it's a bug!
self.current_task.throw(InternalError(
"Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?"
))
self.current_task.throw(
InternalError(
"Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?"
)
)
# Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args, **kwargs)
self.debugger.after_task_step(self.current_task)
@ -289,7 +310,11 @@ class FIFOKernel:
self.run()
finally:
self.debugger.on_exit()
if self.entry_point.exc and self.entry_point.context is None and self.entry_point.propagate:
if (
self.entry_point.exc
and self.entry_point.context is None
and self.entry_point.propagate
):
# Contexts already manage exceptions for us,
# no need to raise it manually
raise self.entry_point.exc
@ -336,9 +361,7 @@ class FIFOKernel:
dict(self.selector.get_map()).values(),
):
for task in k.data:
self.handle_task_run(
partial(task.throw, exc), task
)
self.handle_task_run(partial(task.throw, exc), task)
def cancel(self, task: Task):
"""

View File

@ -62,11 +62,7 @@ def new_event_loop(clock_function: Callable, debugger: BaseDebugger | None = Non
local_storage.loop = FIFOKernel(clock_function, debugger)
def run(
func: Callable[[Any, Any], Coroutine[Any, Any, Any]],
*args,
**kwargs
):
def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs):
"""
Starts the event loop from a synchronous entry point
"""

65
aiosched/sync.py Normal file
View File

@ -0,0 +1,65 @@
"""
aiosched: Yet another Python async scheduler
Copyright (C) 2022 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https:www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from typing import Any
from aiosched.errors import SchedulerError
from aiosched.internals.syscalls import (
suspend,
schedule,
current_task,
)
class Event:
"""
An asynchronous, non thread-safe event
"""
def __init__(self):
"""
Object constructor
"""
self.set = False
self.waiters = set()
def reset(self):
"""
Resets the event's state
"""
self.__init__()
async def trigger(self):
"""
Sets the event, waking up all tasks that called
wait() on it
"""
if self.set:
raise SchedulerError("The event has already been set")
self.set = True
for waiter in self.waiters:
await schedule(waiter)
async def wait(self) -> Any:
"""
Waits until the event is set
"""
self.waiters.add(await current_task())
await suspend() # We get unsuspended by trigger()

View File

@ -32,5 +32,6 @@ async def main(children: list[tuple[str, int]]):
print(f"[main] Child #{i + 1} has exited")
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)])

View File

@ -11,7 +11,9 @@ async def main(children: list[tuple[str, int]]):
print("[main] Children spawned")
before = aiosched.clock()
if ctx.exc:
print(f"[main] Child raised an exception -> {type(ctx.exc).__name__}: {ctx.exc}")
print(
f"[main] Child raised an exception -> {type(ctx.exc).__name__}: {ctx.exc}"
)
print(f"[main] Children exited in {aiosched.clock() - before:.2f} seconds")

View File

@ -14,4 +14,4 @@ async def main(children: list[tuple[str, int]]):
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None)
aiosched.run(main, [("first", 1), ("second", 2), ("third", 3)], debugger=None)

35
tests/events.py Normal file
View File

@ -0,0 +1,35 @@
from debugger import Debugger
import aiosched
async def child(ev: aiosched.Event, pause: int):
print("[child] Child is alive! Going to wait until notified")
start_total = aiosched.clock()
await ev.wait()
end_pause = aiosched.clock() - start_total
print(f"[child] Parent set the event with, exiting in {pause} seconds")
start_sleep = aiosched.clock()
await aiosched.sleep(pause)
end_sleep = aiosched.clock() - start_sleep
end_total = aiosched.clock() - start_total
print(
f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!"
)
async def parent(pause: int = 1):
async with aiosched.with_context() as ctx:
event = aiosched.Event()
print("[parent] Spawning child task")
await ctx.spawn(child, event, pause + 2)
start = aiosched.clock()
print(f"[parent] Sleeping {pause} second(s) before setting the event")
await aiosched.sleep(pause)
await event.trigger()
print("[parent] Event set, awaiting child completion")
end = aiosched.clock() - start
print(f"[parent] Child exited in {end:.2f} seconds")
if __name__ == "__main__":
aiosched.run(parent, 3, debugger=None)

View File

@ -4,7 +4,9 @@ from wait import child as successful
from debugger import Debugger
async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]):
async def main(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
before = aiosched.clock()
async with aiosched.with_context() as ctx:
print("[main] Spawning children in first context")
@ -23,4 +25,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None)
aiosched.run(
main,
[("first", 1), ("second", 2)],
[("third", 3), ("fourth", 4)],
debugger=None,
)

View File

@ -4,7 +4,9 @@ from debugger import Debugger
# TODO: This crashes 1 second later than it should be
async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]):
async def main(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
try:
async with aiosched.with_context() as ctx:
before = aiosched.clock()
@ -23,4 +25,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None)
aiosched.run(
main,
[("first", 1), ("second", 2)],
[("third", 3), ("fourth", 4)],
debugger=None,
)

View File

@ -3,7 +3,9 @@ from wait import child
from debugger import Debugger
async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]):
async def main(
children_outer: list[tuple[str, int]], children_inner: list[tuple[str, int]]
):
async with aiosched.with_context() as ctx:
before = aiosched.clock()
print("[main] Spawning children in first context")
@ -19,4 +21,9 @@ async def main(children_outer: list[tuple[str, int]], children_inner: list[tuple
if __name__ == "__main__":
aiosched.run(main, [("first", 1), ("second", 2)], [("third", 3), ("fourth", 4)], debugger=None)
aiosched.run(
main,
[("first", 1), ("second", 2)],
[("third", 3), ("fourth", 4)],
debugger=None,
)