Added experimental memory channels

This commit is contained in:
Nocturn9x 2022-02-27 12:41:23 +01:00
parent 3eb6844848
commit 3ec0864734
8 changed files with 178 additions and 14 deletions

View File

@ -22,7 +22,7 @@ __version__ = (0, 0, 1)
from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task
from giambio.sync import Event, Queue
from giambio.sync import Event, Queue, MemoryChannel
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug
@ -34,6 +34,7 @@ __all__ = [
"sleep",
"Event",
"Queue",
"MemoryChannel",
"run",
"clock",
"create_pool",

View File

@ -78,14 +78,17 @@ class TaskManager:
all the tasks spawned inside the pool
"""
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
try:
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
return True
except giambio.exceptions.TooSlowError:
return True
async def cancel(self):

View File

@ -360,6 +360,7 @@ class AsyncScheduler:
if self.current_task.last_io:
self.io_release_task(self.current_task)
self.current_task.status = "sleep"
self.suspended.append(self.current_task)
@ -728,6 +729,8 @@ class AsyncScheduler:
self.io_release_task(task)
elif task.status == "sleep":
self.paused.discard(task)
if task in self.suspended:
self.suspended.remove(task)
try:
self.do_cancel(task)
except CancelledError as cancel:

View File

@ -45,7 +45,7 @@ class Event:
if self.set:
raise GiambioError("The event has already been set")
self.value = value
await event_set(self)
await event_set(self, value)
async def wait(self):
"""
@ -73,6 +73,27 @@ class Queue:
self.putters = deque()
self.container = deque()
def __len__(self):
"""
Returns the length of the queue
"""
return len(self.container)
async def __aiter__(self):
"""
Implements the iterator protocol
"""
return self
async def __anext__(self):
"""
Implements the iterator protocol
"""
return await self.get()
async def put(self, item: Any):
"""
@ -81,7 +102,7 @@ class Queue:
enough space for the queue
"""
if self.maxsize and len(self.container) < self.maxsize:
if not self.maxsize or len(self.container) < self.maxsize:
self.container.append(item)
if self.getters:
await self.getters.popleft().trigger(self.container.popleft())
@ -106,3 +127,74 @@ class Queue:
ev = Event()
self.getters.append(ev)
return await ev.wait()
async def clear(self):
"""
Clears the queue
"""
self.container.clear()
async def reset(self):
"""
Resets the queue
"""
await self.clear()
self.getters.clear()
self.putters.clear()
class MemoryChannel:
"""
A two-way communication channel between tasks based
on giambio's queueing mechanism. Operations on this
object do not perform any I/O or other system call and
are therefore extremely efficient
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
# We use a queue as our buffer
self.buffer = Queue(maxsize=maxsize)
self.maxsize = maxsize
self.closed = False
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available
"""
if self.closed:
return
await self.buffer.put(data)
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return await self.buffer.get()
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for clients to read
"""
self.closed = True
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return bool(len(self.buffer))

View File

@ -150,7 +150,7 @@ async def cancel(task):
"""
await create_trap("cancel", task)
assert task.cancelled, f"Task ignored CancelledError"
assert task.done(), f"Task ignored CancelledError"
async def want_read(stream):
@ -198,13 +198,14 @@ async def event_wait(event):
return await suspend()
async def event_set(event):
async def event_set(event, value):
"""
Sets the given event and reawakens its
waiters
"""
event.set = True
event.value = value
loop = await current_loop()
for waiter in event.waiters:
loop._data[waiter] = event.value

31
tests/memory_channel.py Normal file
View File

@ -0,0 +1,31 @@
import giambio
from debugger import Debugger
async def sender(c: giambio.MemoryChannel, n: int):
for i in range(n):
await c.write(str(i))
print(f"Sent {i}")
await c.close()
print("Sender done")
async def receiver(c: giambio.MemoryChannel):
while True:
if not await c.pending() and c.closed:
print("Receiver done")
break
item = await c.read()
print(f"Received {item}")
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel, n: int):
async with giambio.create_pool() as pool:
await pool.spawn(sender, channel, n)
await pool.spawn(receiver, channel)
channel = giambio.MemoryChannel(2)
giambio.run(main, channel, 5, debugger=())

View File

@ -27,5 +27,5 @@ async def main(q: giambio.Queue, n: int):
queue = giambio.Queue(2)
queue = giambio.Queue()
giambio.run(main, queue, 5, debugger=())

33
tests/task_ipc.py Normal file
View File

@ -0,0 +1,33 @@
import random
import string
import giambio
from debugger import Debugger
async def task(c: giambio.MemoryChannel, name: str):
while True:
if await c.pending():
print(f"[{name}] Received {await c.read()!r}")
else:
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
print(f"[{name}] Sending {data!r}")
await c.write(data)
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel):
print("[main] Spawning workers")
async with giambio.skip_after(5) as pool:
await pool.spawn(task, channel, "one")
await pool.spawn(task, channel, "two")
await pool.spawn(task, channel, "three")
await channel.close()
print(f"[main] Operation complete, channel closed")
if await channel.pending():
print(f"[main] Channel has {len(channel.buffer)} leftover packets of data, clearing it")
while await channel.pending():
print(f"Cleared {await channel.read()!r}")
channel = giambio.MemoryChannel()
giambio.run(main, channel, debugger=())