From e76998f29fdee9ae8892773fe2b9a8d169e0b3b0 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 18:50:57 +0100 Subject: [PATCH] Added experimental network channels --- giambio/context.py | 2 +- giambio/core.py | 3 +- giambio/sync.py | 69 ++++++++++++++++++++++++++++++++++++++++++++-- tests/task_ipc2.py | 32 +++++++++++++++++++++ 4 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 tests/task_ipc2.py diff --git a/giambio/context.py b/giambio/context.py index c6fdd21..9fe7cfa 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -92,7 +92,7 @@ class TaskManager: if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: return True except giambio.exceptions.TooSlowError: - if not self.raise_on_timeout: + if self.raise_on_timeout: raise async def cancel(self): diff --git a/giambio/core.py b/giambio/core.py index d2de041..48fb734 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -445,7 +445,8 @@ class AsyncScheduler: for task in pool.tasks: self.join(task) self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) - + if pool.entry_point is self.entry_point: + self.run_ready.append(self.entry_point) def schedule_tasks(self, tasks: List[Task]): """ diff --git a/giambio/sync.py b/giambio/sync.py index 5641d86..74a0163 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,12 +15,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from socket import socketpair from abc import ABC, abstractmethod -import abc from collections import deque from typing import Any, Optional from giambio.traps import event_wait, event_set from giambio.exceptions import GiambioError +from giambio.socket import wrap_socket class Event: @@ -258,5 +259,69 @@ class MemoryChannel(Channel): class NetworkChannel(Channel): """ A two-way communication channel between tasks based - on giambio's I/O mechanisms. This variant of a channel + on giambio's I/O mechanisms instead of in-memory queues """ + + def __init__(self): + """ + Public object constructor + """ + + super().__init__(None) + # We use a socket as our buffer instead of a queue + sockets = socketpair() + self.reader = wrap_socket(sockets[0]) + self.writer = wrap_socket(sockets[1]) + self._internal_buffer = b"" + + + async def write(self, data: bytes): + """ + Writes data to the channel. Blocks if the internal + socket is not currently available. Does nothing + if the channel has been closed + """ + + if self.closed: + return + await self.writer.send_all(data) + + async def read(self, size: int): + """ + Reads exactly size bytes from the channel. Blocks until + enough data arrives. Extra data is cached and used on the + next read + """ + + data = self._internal_buffer + while len(data) < size: + data += await self.reader.receive(size) + self._internal_buffer = data[size:] + data = data[:size] + return data + + async def close(self): + """ + Closes the memory channel. Any underlying + data is flushed out of the internal socket + and is lost + """ + + self.closed = True + await self.reader.close() + await self.writer.close() + + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + # TODO: Ugly! + if self.closed: + return False + try: + self._internal_buffer += self.reader.sock.recv(1) + except BlockingIOError: + return False + return True diff --git a/tests/task_ipc2.py b/tests/task_ipc2.py new file mode 100644 index 0000000..673d227 --- /dev/null +++ b/tests/task_ipc2.py @@ -0,0 +1,32 @@ +import random +import string +import giambio +from debugger import Debugger + + +async def task(c: giambio.NetworkChannel, name: str): + while True: + if await c.pending(): + print(f"[{name}] Received {(await c.read(8)).decode()!r}") + else: + data = "".join(random.choice(string.ascii_letters) for _ in range(8)) + print(f"[{name}] Sending {data!r}") + await c.write(data.encode()) + await giambio.sleep(1) + + +async def main(channel: giambio.NetworkChannel, delay: int): + print(f"[main] Spawning workers, exiting in {delay} seconds") + async with giambio.skip_after(delay) as pool: + await pool.spawn(task, channel, "one") + await pool.spawn(task, channel, "two") + await channel.close() + print(f"[main] Operation complete, channel closed") + if await channel.pending(): + print(f"[main] Channel has leftover data, clearing it") + while await channel.pending(): + print(f"[main] Cleared {await channel.read(1)!r}") + + +channel = giambio.NetworkChannel() +giambio.run(main, channel, 4, debugger=()) \ No newline at end of file