Added locks

This commit is contained in:
Nocturn9x 2022-05-10 11:56:47 +02:00
parent e76998f29f
commit ad34be8754
8 changed files with 116 additions and 29 deletions

View File

@ -22,7 +22,7 @@ __version__ = (0, 0, 1)
from giambio import exceptions, socket, context, core, task, io from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task from giambio.traps import sleep, current_task
from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel, Lock
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug from giambio.util import debug
@ -37,6 +37,7 @@ __all__ = [
"Channel", "Channel",
"NetworkChannel", "NetworkChannel",
"MemoryChannel", "MemoryChannel",
"Lock",
"run", "run",
"clock", "clock",
"create_pool", "create_pool",

View File

@ -130,6 +130,7 @@ class AsyncScheduler:
self.entry_point: Optional[Task] = None self.entry_point: Optional[Task] = None
# Suspended tasks # Suspended tasks
self.suspended: deque = deque() self.suspended: deque = deque()
def __repr__(self): def __repr__(self):
""" """
@ -209,7 +210,10 @@ class AsyncScheduler:
# after it is set, but it makes the implementation easier # after it is set, but it makes the implementation easier
if not self.current_pool and self.current_task.pool: if not self.current_pool and self.current_task.pool:
self.current_pool = self.current_task.pool self.current_pool = self.current_task.pool
self.deadlines.put(self.current_pool) pool = self.current_pool
while pool:
self.deadlines.put(pool)
pool = self.current_pool.enclosed_pool
# If there are no actively running tasks, we start by # If there are no actively running tasks, we start by
# checking for I/O. This method will wait for I/O until # checking for I/O. This method will wait for I/O until
# the closest deadline to avoid starving sleeping tasks # the closest deadline to avoid starving sleeping tasks
@ -288,16 +292,18 @@ class AsyncScheduler:
account, that's self.run's job! account, that's self.run's job!
""" """
data = None
# Sets the currently running task # Sets the currently running task
self.current_task = self.run_ready.popleft() self.current_task = self.run_ready.popleft()
if self.current_task.done(): if self.current_task.done():
# We need to make sure we don't try to execute # We need to make sure we don't try to execute
# exited tasks that are on the running queue # exited tasks that are on the running queue
return return
if not self.current_pool and self.current_task.pool: if not self.current_pool:
self.current_pool = self.current_task.pool self.current_pool = self.current_task.pool
self.deadlines.put(self.current_pool) pool = self.current_pool
while pool:
self.deadlines.put(pool)
pool = self.current_pool.enclosed_pool
self.debugger.before_task_step(self.current_task) self.debugger.before_task_step(self.current_task)
# Some debugging and internal chatter here # Some debugging and internal chatter here
self.current_task.status = "run" self.current_task.status = "run"
@ -363,7 +369,6 @@ class AsyncScheduler:
self.current_task.status = "sleep" self.current_task.status = "sleep"
self.suspended.append(self.current_task) self.suspended.append(self.current_task)
def reschedule_running(self): def reschedule_running(self):
""" """
Reschedules the currently running task Reschedules the currently running task
@ -444,8 +449,8 @@ class AsyncScheduler:
self.cancel_pool(pool) self.cancel_pool(pool)
for task in pool.tasks: for task in pool.tasks:
self.join(task) self.join(task)
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
if pool.entry_point is self.entry_point: if pool.entry_point is self.entry_point:
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
self.run_ready.append(self.entry_point) self.run_ready.append(self.entry_point)
def schedule_tasks(self, tasks: List[Task]): def schedule_tasks(self, tasks: List[Task]):
@ -480,6 +485,7 @@ class AsyncScheduler:
self.run_ready.append(task) self.run_ready.append(task)
self.debugger.after_sleep(task, slept) self.debugger.after_sleep(task, slept)
def get_closest_deadline(self) -> float: def get_closest_deadline(self) -> float:
""" """
Gets the closest expiration deadline (asleep tasks, timeouts) Gets the closest expiration deadline (asleep tasks, timeouts)
@ -619,6 +625,16 @@ class AsyncScheduler:
elif not self.done(): elif not self.done():
raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit")
self.shutdown() self.shutdown()
# We reset the event loop's state
self.tasks = []
self.entry_point = None
self.current_pool = None
self.current_task = None
self.paused = TimeQueue(self.clock)
self.deadlines = DeadlinesQueue()
self.run_ready = deque()
self.suspended = deque()
def reschedule_joiners(self, task: Task): def reschedule_joiners(self, task: Task):
""" """

View File

@ -95,19 +95,19 @@ def create_pool():
return TaskManager() return TaskManager()
def with_timeout(timeout: int or float): def with_timeout(timeout: int or float):
""" """
Creates an async pool with an associated timeout Creates an async pool with an associated timeout
""" """
assert timeout > 0, "The timeout must be greater than 0" assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout)
loop = get_event_loop() loop = get_event_loop()
if loop.current_task.pool is None: mgr = TaskManager(timeout)
loop.current_pool = mgr if loop.current_task is not loop.entry_point:
loop.current_task.pool = mgr mgr.tasks.append(loop.current_task)
loop.current_task.next_deadline = mgr.timeout or 0.0 if loop.current_pool and loop.current_pool is not mgr:
loop.deadlines.put(mgr) loop.current_pool.enclosed_pool = mgr
return mgr return mgr
@ -119,11 +119,11 @@ def skip_after(timeout: int or float):
""" """
assert timeout > 0, "The timeout must be greater than 0" assert timeout > 0, "The timeout must be greater than 0"
mgr = TaskManager(timeout, False)
loop = get_event_loop() loop = get_event_loop()
if loop.current_task.pool is None: mgr = TaskManager(timeout, False)
loop.current_pool = mgr if loop.current_task is not loop.entry_point:
loop.current_task.pool = mgr mgr.tasks.append(loop.current_task)
loop.current_task.next_deadline = mgr.timeout or 0.0 if loop.current_pool and loop.current_pool is not mgr:
loop.deadlines.put(mgr) loop.current_pool.enclosed_pool = mgr
return mgr return mgr

View File

@ -19,9 +19,10 @@ from socket import socketpair
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import deque from collections import deque
from typing import Any, Optional from typing import Any, Optional
from giambio.traps import event_wait, event_set from giambio.traps import event_wait, event_set, current_task
from giambio.exceptions import GiambioError from giambio.exceptions import GiambioError
from giambio.socket import wrap_socket from giambio.socket import wrap_socket
from giambio.task import Task
class Event: class Event:
@ -72,7 +73,11 @@ class Queue:
""" """
self.maxsize = maxsize self.maxsize = maxsize
# Stores event objects for tasks wanting to
# get items from the queue
self.getters = deque() self.getters = deque()
# Stores event objects for tasks wanting to
# put items on the queue
self.putters = deque() self.putters = deque()
self.container = deque() self.container = deque()
@ -84,16 +89,19 @@ class Queue:
return len(self.container) return len(self.container)
def __repr__(self) -> str:
return f"{type(self).__name__}({f', '.join(map(str, self.container))})"
async def __aiter__(self): async def __aiter__(self):
""" """
Implements the iterator protocol Implements the asynchronous iterator protocol
""" """
return self return self
async def __anext__(self): async def __anext__(self):
""" """
Implements the iterator protocol Implements the asynchronous iterator protocol
""" """
return await self.get() return await self.get()
@ -325,3 +333,56 @@ class NetworkChannel(Channel):
except BlockingIOError: except BlockingIOError:
return False return False
return True return True
class Lock:
"""
A simple single-owner lock
"""
def __init__(self):
"""
Public constructor
"""
self.owner: Optional[Task] = None
self.tasks: deque[Event] = deque()
async def acquire(self):
"""
Acquires the lock
"""
task = await current_task()
if self.owner is None:
self.owner = task
elif task is self.owner:
raise RuntimeError("lock is already acquired by current task")
elif self.owner is not task:
self.tasks.append(Event())
await self.tasks[-1].wait()
self.owner = task
async def release(self):
"""
Releases the lock
"""
task = await current_task()
if self.owner is None:
raise RuntimeError("lock is not acquired")
elif self.owner is not task:
raise RuntimeError("lock can only released by its owner")
elif self.tasks:
await self.tasks.popleft().trigger()
else:
self.owner = None
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
await self.release()

View File

@ -209,4 +209,4 @@ async def event_set(event, value):
loop = await current_loop() loop = await current_loop()
for waiter in event.waiters: for waiter in event.waiters:
loop._data[waiter] = event.value loop._data[waiter] = event.value
await schedule_tasks(event.waiters) await schedule_tasks(event.waiters)

View File

@ -4,6 +4,9 @@ from debugger import Debugger
async def producer(q: giambio.Queue, n: int): async def producer(q: giambio.Queue, n: int):
for i in range(n): for i in range(n):
# This will wait until the
# queue is emptied by the
# consumer
await q.put(i) await q.put(i)
print(f"Produced {i}") print(f"Produced {i}")
await q.put(None) await q.put(None)
@ -12,11 +15,16 @@ async def producer(q: giambio.Queue, n: int):
async def consumer(q: giambio.Queue): async def consumer(q: giambio.Queue):
while True: while True:
# Hangs until there is
# something on the queue
item = await q.get() item = await q.get()
if item is None: if item is None:
print("Consumer done") print("Consumer done")
break break
print(f"Consumed {item}") print(f"Consumed {item}")
# Simulates some work so the
# producer waits before putting
# the next value
await giambio.sleep(1) await giambio.sleep(1)
@ -24,8 +32,8 @@ async def main(q: giambio.Queue, n: int):
async with giambio.create_pool() as pool: async with giambio.create_pool() as pool:
await pool.spawn(producer, q, n) await pool.spawn(producer, q, n)
await pool.spawn(consumer, q) await pool.spawn(consumer, q)
print("Bye!")
queue = giambio.Queue() queue = giambio.Queue(1) # Queue has size limit of 1
giambio.run(main, queue, 5, debugger=()) giambio.run(main, queue, 5, debugger=())

View File

@ -1,3 +1,4 @@
## SImple task IPC using giambio's MemoryChannel class
import random import random
import string import string
import giambio import giambio

View File

@ -11,11 +11,11 @@ async def child(name: int):
async def main(): async def main():
start = giambio.clock() start = giambio.clock()
async with giambio.skip_after(10) as pool: async with giambio.skip_after(10) as pool:
await pool.spawn(child, 7) # This will complete await pool.spawn(child, 7) # This will complete
await giambio.sleep(2) # This will make the code below wait 2 seconds await giambio.sleep(2) # This will make the code below wait 2 seconds
await pool.spawn(child, 15) # This will not complete await pool.spawn(child, 15) # This will not complete
await giambio.sleep(50) await giambio.sleep(50) # Neither will this
await child(20) # Neither will this await child(20) # Nor this
if pool.timed_out: if pool.timed_out:
print("[main] One or more children have timed out!") print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")