diff --git a/giambio/__init__.py b/giambio/__init__.py index 0ced5c2..02c3062 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue +from giambio.sync import Event, Queue, MemoryChannel from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -34,6 +34,7 @@ __all__ = [ "sleep", "Event", "Queue", + "MemoryChannel", "run", "clock", "create_pool", diff --git a/giambio/context.py b/giambio/context.py index abc12f9..f610bf4 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -78,14 +78,17 @@ class TaskManager: all the tasks spawned inside the pool """ - for task in self.tasks: - # This forces the interpreter to stop at the - # end of the block and wait for all - # children to exit - await task.join() - self.tasks.remove(task) - self._proper_init = False - if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: + try: + for task in self.tasks: + # This forces the interpreter to stop at the + # end of the block and wait for all + # children to exit + await task.join() + self.tasks.remove(task) + self._proper_init = False + if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: + return True + except giambio.exceptions.TooSlowError: return True async def cancel(self): diff --git a/giambio/core.py b/giambio/core.py index ee48c00..58058fb 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -360,6 +360,7 @@ class AsyncScheduler: if self.current_task.last_io: self.io_release_task(self.current_task) + self.current_task.status = "sleep" self.suspended.append(self.current_task) @@ -728,6 +729,8 @@ class AsyncScheduler: self.io_release_task(task) elif task.status == "sleep": self.paused.discard(task) + if task in self.suspended: + self.suspended.remove(task) try: self.do_cancel(task) except CancelledError as cancel: diff --git a/giambio/sync.py b/giambio/sync.py index 9da7a3a..cc7191c 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -45,7 +45,7 @@ class Event: if self.set: raise GiambioError("The event has already been set") self.value = value - await event_set(self) + await event_set(self, value) async def wait(self): """ @@ -73,6 +73,27 @@ class Queue: self.putters = deque() self.container = deque() + def __len__(self): + """ + Returns the length of the queue + """ + + return len(self.container) + + + async def __aiter__(self): + """ + Implements the iterator protocol + """ + + return self + + async def __anext__(self): + """ + Implements the iterator protocol + """ + + return await self.get() async def put(self, item: Any): """ @@ -81,7 +102,7 @@ class Queue: enough space for the queue """ - if self.maxsize and len(self.container) < self.maxsize: + if not self.maxsize or len(self.container) < self.maxsize: self.container.append(item) if self.getters: await self.getters.popleft().trigger(self.container.popleft()) @@ -106,3 +127,74 @@ class Queue: ev = Event() self.getters.append(ev) return await ev.wait() + + async def clear(self): + """ + Clears the queue + """ + + self.container.clear() + + async def reset(self): + """ + Resets the queue + """ + + await self.clear() + self.getters.clear() + self.putters.clear() + + +class MemoryChannel: + """ + A two-way communication channel between tasks based + on giambio's queueing mechanism. Operations on this + object do not perform any I/O or other system call and + are therefore extremely efficient + """ + + def __init__(self, maxsize: Optional[int] = None): + """ + Public object constructor + """ + + # We use a queue as our buffer + self.buffer = Queue(maxsize=maxsize) + self.maxsize = maxsize + self.closed = False + + + async def write(self, data: str): + """ + Writes data to the channel. Blocks if the internal + queue is full until a spot is available + """ + + if self.closed: + return + await self.buffer.put(data) + + async def read(self): + """ + Reads data from the channel. Blocks until + a message arrives or returns immediately if + one is already waiting + """ + + return await self.buffer.get() + + async def close(self): + """ + Closes the memory channel. Any underlying + data is left for clients to read + """ + + self.closed = True + + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + return bool(len(self.buffer)) \ No newline at end of file diff --git a/giambio/traps.py b/giambio/traps.py index b15a838..2e1e8f6 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -150,7 +150,7 @@ async def cancel(task): """ await create_trap("cancel", task) - assert task.cancelled, f"Task ignored CancelledError" + assert task.done(), f"Task ignored CancelledError" async def want_read(stream): @@ -198,13 +198,14 @@ async def event_wait(event): return await suspend() -async def event_set(event): +async def event_set(event, value): """ Sets the given event and reawakens its waiters """ event.set = True + event.value = value loop = await current_loop() for waiter in event.waiters: loop._data[waiter] = event.value diff --git a/tests/memory_channel.py b/tests/memory_channel.py new file mode 100644 index 0000000..ee2a8e8 --- /dev/null +++ b/tests/memory_channel.py @@ -0,0 +1,31 @@ +import giambio +from debugger import Debugger + + +async def sender(c: giambio.MemoryChannel, n: int): + for i in range(n): + await c.write(str(i)) + print(f"Sent {i}") + await c.close() + print("Sender done") + + +async def receiver(c: giambio.MemoryChannel): + while True: + if not await c.pending() and c.closed: + print("Receiver done") + break + item = await c.read() + print(f"Received {item}") + await giambio.sleep(1) + + +async def main(channel: giambio.MemoryChannel, n: int): + async with giambio.create_pool() as pool: + await pool.spawn(sender, channel, n) + await pool.spawn(receiver, channel) + + + +channel = giambio.MemoryChannel(2) +giambio.run(main, channel, 5, debugger=()) \ No newline at end of file diff --git a/tests/queue.py b/tests/queue.py index 7787bb1..afe761a 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -27,5 +27,5 @@ async def main(q: giambio.Queue, n: int): -queue = giambio.Queue(2) +queue = giambio.Queue() giambio.run(main, queue, 5, debugger=()) diff --git a/tests/task_ipc.py b/tests/task_ipc.py new file mode 100644 index 0000000..79617df --- /dev/null +++ b/tests/task_ipc.py @@ -0,0 +1,33 @@ +import random +import string +import giambio +from debugger import Debugger + + +async def task(c: giambio.MemoryChannel, name: str): + while True: + if await c.pending(): + print(f"[{name}] Received {await c.read()!r}") + else: + data = "".join(random.choice(string.ascii_letters) for _ in range(8)) + print(f"[{name}] Sending {data!r}") + await c.write(data) + await giambio.sleep(1) + + +async def main(channel: giambio.MemoryChannel): + print("[main] Spawning workers") + async with giambio.skip_after(5) as pool: + await pool.spawn(task, channel, "one") + await pool.spawn(task, channel, "two") + await pool.spawn(task, channel, "three") + await channel.close() + print(f"[main] Operation complete, channel closed") + if await channel.pending(): + print(f"[main] Channel has {len(channel.buffer)} leftover packets of data, clearing it") + while await channel.pending(): + print(f"Cleared {await channel.read()!r}") + + +channel = giambio.MemoryChannel() +giambio.run(main, channel, debugger=()) \ No newline at end of file