From f38e508ef6648c2755c62cd54aed6c97c8330421 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sat, 26 Feb 2022 19:35:03 +0100 Subject: [PATCH 1/9] Fixed bug where suspended tasks would not be purged upon exiting --- giambio/core.py | 4 ++++ tests/chatroom_client.py | 1 + tests/timeout.py | 8 ++++---- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/giambio/core.py b/giambio/core.py index 237c76b..1715271 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -644,6 +644,8 @@ class AsyncScheduler: self.debugger.on_task_exit(task) if task.last_io: self.io_release_task(task) + if task in self.suspended: + self.suspended.remove(task) # If the pool has finished executing or we're at the first parent # task that kicked the loop, we can safely reschedule the parent(s) if task.pool is None: @@ -651,6 +653,8 @@ class AsyncScheduler: if task.pool.done(): self.reschedule_joiners(task) elif task.exc: + if task in self.suspended: + self.suspended.remove(task) task.status = "crashed" if task.exc.__traceback__: # TODO: We might want to do a bit more complex traceback hacking to remove any extra diff --git a/tests/chatroom_client.py b/tests/chatroom_client.py index 80ac22e..a1e28d2 100644 --- a/tests/chatroom_client.py +++ b/tests/chatroom_client.py @@ -8,6 +8,7 @@ async def sender(sock: giambio.socket.AsyncSocket, q: giambio.Queue): while True: await sock.send_all(b"yo") await q.put((0, "")) + await giambio.sleep(1) async def receiver(sock: giambio.socket.AsyncSocket, q: giambio.Queue): diff --git a/tests/timeout.py b/tests/timeout.py index 01d4e50..dc1cc5f 100644 --- a/tests/timeout.py +++ b/tests/timeout.py @@ -12,11 +12,11 @@ async def main(): start = giambio.clock() try: async with giambio.with_timeout(12) as pool: - await pool.spawn(child, 7) # This will complete - await giambio.sleep(2) # This will make the code below wait 2 seconds + await pool.spawn(child, 7) # This will complete + await giambio.sleep(2) # This will make the code below wait 2 seconds await pool.spawn(child, 15) # This will not complete - await giambio.sleep(50) - await child(20) # Neither will this + await child(20) # Neither will this + await giambio.sleep(50) # Nor this except giambio.exceptions.TooSlowError: print("[main] One or more children have timed out!") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") From da9a579caf8177c59cf8fc75e3962c14c7a6ce47 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sat, 26 Feb 2022 19:35:42 +0100 Subject: [PATCH 2/9] Changed version back to 0.x.x because giambio isn't ready yet --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c8e9aa5..b9adc85 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as readme: setuptools.setup( name="GiambIO", - version="1.0.1", + version="0.5.0", author="Nocturn9x", author_email="hackhab@gmail.com", description="Asynchronous Python made easy (and friendly)", From 3eb6844848fb1b0f3112f825426661e4afa5581a Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sat, 26 Feb 2022 21:59:18 +0100 Subject: [PATCH 3/9] Hopefully fixed giambio.Queue --- giambio/core.py | 5 ++++- giambio/sync.py | 18 ++++++++++-------- giambio/traps.py | 2 +- tests/queue.py | 5 ++--- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/giambio/core.py b/giambio/core.py index 1715271..ee48c00 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -95,7 +95,7 @@ class AsyncScheduler: or type( "DumbDebugger", (object,), - {"__getattr__": lambda *args: lambda *arg: None}, + {"__getattr__": lambda *_: lambda *_: None}, )() ) # All tasks the loop has @@ -151,6 +151,8 @@ class AsyncScheduler: "_data", "io_skip_limit", "io_max_timeout", + "suspended", + "entry_point" } data = ", ".join( name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) @@ -360,6 +362,7 @@ class AsyncScheduler: self.io_release_task(self.current_task) self.suspended.append(self.current_task) + def reschedule_running(self): """ Reschedules the currently running task diff --git a/giambio/sync.py b/giambio/sync.py index f6babab..9da7a3a 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -17,7 +17,7 @@ limitations under the License. """ from collections import deque from typing import Any, Optional -from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop +from giambio.traps import event_wait, event_set from giambio.exceptions import GiambioError @@ -71,7 +71,7 @@ class Queue: self.maxsize = maxsize self.getters = deque() self.putters = deque() - self.container = deque(maxlen=maxsize) + self.container = deque() async def put(self, item: Any): @@ -81,14 +81,15 @@ class Queue: enough space for the queue """ - if not self.maxsize or len(self.container) < self.maxsize: + if self.maxsize and len(self.container) < self.maxsize: self.container.append(item) if self.getters: await self.getters.popleft().trigger(self.container.popleft()) else: - self.putters.append(Event()) - await self.putters[-1].wait() - + ev = Event() + self.putters.append(ev) + await ev.wait() + self.container.append(item) async def get(self) -> Any: """ @@ -102,5 +103,6 @@ class Queue: await self.putters.popleft().trigger() return self.container.popleft() else: - self.getters.append(Event()) - return await self.getters[-1].wait() \ No newline at end of file + ev = Event() + self.getters.append(ev) + return await ev.wait() diff --git a/giambio/traps.py b/giambio/traps.py index 240917c..b15a838 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -195,7 +195,7 @@ async def event_wait(event): return task = await current_task() event.waiters.add(task) - return await create_trap("suspend") + return await suspend() async def event_set(event): diff --git a/tests/queue.py b/tests/queue.py index d8dc4bb..7787bb1 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -18,15 +18,14 @@ async def consumer(q: giambio.Queue): break print(f"Consumed {item}") await giambio.sleep(1) - async def main(q: giambio.Queue, n: int): async with giambio.create_pool() as pool: - await pool.spawn(consumer, q) await pool.spawn(producer, q, n) + await pool.spawn(consumer, q) -queue = giambio.Queue() +queue = giambio.Queue(2) giambio.run(main, queue, 5, debugger=()) From 3ec0864734bc383bd376e2c05ffa84eb206df0f3 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 12:41:23 +0100 Subject: [PATCH 4/9] Added experimental memory channels --- giambio/__init__.py | 3 +- giambio/context.py | 19 ++++---- giambio/core.py | 3 ++ giambio/sync.py | 96 ++++++++++++++++++++++++++++++++++++++++- giambio/traps.py | 5 ++- tests/memory_channel.py | 31 +++++++++++++ tests/queue.py | 2 +- tests/task_ipc.py | 33 ++++++++++++++ 8 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 tests/memory_channel.py create mode 100644 tests/task_ipc.py diff --git a/giambio/__init__.py b/giambio/__init__.py index 0ced5c2..02c3062 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue +from giambio.sync import Event, Queue, MemoryChannel from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -34,6 +34,7 @@ __all__ = [ "sleep", "Event", "Queue", + "MemoryChannel", "run", "clock", "create_pool", diff --git a/giambio/context.py b/giambio/context.py index abc12f9..f610bf4 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -78,14 +78,17 @@ class TaskManager: all the tasks spawned inside the pool """ - for task in self.tasks: - # This forces the interpreter to stop at the - # end of the block and wait for all - # children to exit - await task.join() - self.tasks.remove(task) - self._proper_init = False - if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: + try: + for task in self.tasks: + # This forces the interpreter to stop at the + # end of the block and wait for all + # children to exit + await task.join() + self.tasks.remove(task) + self._proper_init = False + if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: + return True + except giambio.exceptions.TooSlowError: return True async def cancel(self): diff --git a/giambio/core.py b/giambio/core.py index ee48c00..58058fb 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -360,6 +360,7 @@ class AsyncScheduler: if self.current_task.last_io: self.io_release_task(self.current_task) + self.current_task.status = "sleep" self.suspended.append(self.current_task) @@ -728,6 +729,8 @@ class AsyncScheduler: self.io_release_task(task) elif task.status == "sleep": self.paused.discard(task) + if task in self.suspended: + self.suspended.remove(task) try: self.do_cancel(task) except CancelledError as cancel: diff --git a/giambio/sync.py b/giambio/sync.py index 9da7a3a..cc7191c 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -45,7 +45,7 @@ class Event: if self.set: raise GiambioError("The event has already been set") self.value = value - await event_set(self) + await event_set(self, value) async def wait(self): """ @@ -73,6 +73,27 @@ class Queue: self.putters = deque() self.container = deque() + def __len__(self): + """ + Returns the length of the queue + """ + + return len(self.container) + + + async def __aiter__(self): + """ + Implements the iterator protocol + """ + + return self + + async def __anext__(self): + """ + Implements the iterator protocol + """ + + return await self.get() async def put(self, item: Any): """ @@ -81,7 +102,7 @@ class Queue: enough space for the queue """ - if self.maxsize and len(self.container) < self.maxsize: + if not self.maxsize or len(self.container) < self.maxsize: self.container.append(item) if self.getters: await self.getters.popleft().trigger(self.container.popleft()) @@ -106,3 +127,74 @@ class Queue: ev = Event() self.getters.append(ev) return await ev.wait() + + async def clear(self): + """ + Clears the queue + """ + + self.container.clear() + + async def reset(self): + """ + Resets the queue + """ + + await self.clear() + self.getters.clear() + self.putters.clear() + + +class MemoryChannel: + """ + A two-way communication channel between tasks based + on giambio's queueing mechanism. Operations on this + object do not perform any I/O or other system call and + are therefore extremely efficient + """ + + def __init__(self, maxsize: Optional[int] = None): + """ + Public object constructor + """ + + # We use a queue as our buffer + self.buffer = Queue(maxsize=maxsize) + self.maxsize = maxsize + self.closed = False + + + async def write(self, data: str): + """ + Writes data to the channel. Blocks if the internal + queue is full until a spot is available + """ + + if self.closed: + return + await self.buffer.put(data) + + async def read(self): + """ + Reads data from the channel. Blocks until + a message arrives or returns immediately if + one is already waiting + """ + + return await self.buffer.get() + + async def close(self): + """ + Closes the memory channel. Any underlying + data is left for clients to read + """ + + self.closed = True + + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + return bool(len(self.buffer)) \ No newline at end of file diff --git a/giambio/traps.py b/giambio/traps.py index b15a838..2e1e8f6 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -150,7 +150,7 @@ async def cancel(task): """ await create_trap("cancel", task) - assert task.cancelled, f"Task ignored CancelledError" + assert task.done(), f"Task ignored CancelledError" async def want_read(stream): @@ -198,13 +198,14 @@ async def event_wait(event): return await suspend() -async def event_set(event): +async def event_set(event, value): """ Sets the given event and reawakens its waiters """ event.set = True + event.value = value loop = await current_loop() for waiter in event.waiters: loop._data[waiter] = event.value diff --git a/tests/memory_channel.py b/tests/memory_channel.py new file mode 100644 index 0000000..ee2a8e8 --- /dev/null +++ b/tests/memory_channel.py @@ -0,0 +1,31 @@ +import giambio +from debugger import Debugger + + +async def sender(c: giambio.MemoryChannel, n: int): + for i in range(n): + await c.write(str(i)) + print(f"Sent {i}") + await c.close() + print("Sender done") + + +async def receiver(c: giambio.MemoryChannel): + while True: + if not await c.pending() and c.closed: + print("Receiver done") + break + item = await c.read() + print(f"Received {item}") + await giambio.sleep(1) + + +async def main(channel: giambio.MemoryChannel, n: int): + async with giambio.create_pool() as pool: + await pool.spawn(sender, channel, n) + await pool.spawn(receiver, channel) + + + +channel = giambio.MemoryChannel(2) +giambio.run(main, channel, 5, debugger=()) \ No newline at end of file diff --git a/tests/queue.py b/tests/queue.py index 7787bb1..afe761a 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -27,5 +27,5 @@ async def main(q: giambio.Queue, n: int): -queue = giambio.Queue(2) +queue = giambio.Queue() giambio.run(main, queue, 5, debugger=()) diff --git a/tests/task_ipc.py b/tests/task_ipc.py new file mode 100644 index 0000000..79617df --- /dev/null +++ b/tests/task_ipc.py @@ -0,0 +1,33 @@ +import random +import string +import giambio +from debugger import Debugger + + +async def task(c: giambio.MemoryChannel, name: str): + while True: + if await c.pending(): + print(f"[{name}] Received {await c.read()!r}") + else: + data = "".join(random.choice(string.ascii_letters) for _ in range(8)) + print(f"[{name}] Sending {data!r}") + await c.write(data) + await giambio.sleep(1) + + +async def main(channel: giambio.MemoryChannel): + print("[main] Spawning workers") + async with giambio.skip_after(5) as pool: + await pool.spawn(task, channel, "one") + await pool.spawn(task, channel, "two") + await pool.spawn(task, channel, "three") + await channel.close() + print(f"[main] Operation complete, channel closed") + if await channel.pending(): + print(f"[main] Channel has {len(channel.buffer)} leftover packets of data, clearing it") + while await channel.pending(): + print(f"Cleared {await channel.read()!r}") + + +channel = giambio.MemoryChannel() +giambio.run(main, channel, debugger=()) \ No newline at end of file From 800a173aa686733831112c418e9a8f1a6df6ba99 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 13:40:26 +0100 Subject: [PATCH 5/9] Minor change to task_ipc test --- tests/task_ipc.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/task_ipc.py b/tests/task_ipc.py index 79617df..87faf48 100644 --- a/tests/task_ipc.py +++ b/tests/task_ipc.py @@ -15,19 +15,19 @@ async def task(c: giambio.MemoryChannel, name: str): await giambio.sleep(1) -async def main(channel: giambio.MemoryChannel): - print("[main] Spawning workers") - async with giambio.skip_after(5) as pool: +async def main(channel: giambio.MemoryChannel, delay: int): + print(f"[main] Spawning workers, exiting in {delay} seconds") + async with giambio.skip_after(delay) as pool: await pool.spawn(task, channel, "one") await pool.spawn(task, channel, "two") await pool.spawn(task, channel, "three") await channel.close() print(f"[main] Operation complete, channel closed") if await channel.pending(): - print(f"[main] Channel has {len(channel.buffer)} leftover packets of data, clearing it") + print(f"[main] Channel has {len(channel.buffer)} leftover packet{'s' if len(channel.buffer) > 1 else ''} of data, clearing it") while await channel.pending(): - print(f"Cleared {await channel.read()!r}") + print(f"[main] Cleared {await channel.read()!r}") channel = giambio.MemoryChannel() -giambio.run(main, channel, debugger=()) \ No newline at end of file +giambio.run(main, channel, 6, debugger=()) \ No newline at end of file From b8ee9945c132c5d0c45982c33de44580ffc33f27 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 14:01:07 +0100 Subject: [PATCH 6/9] Minor changes --- giambio/__init__.py | 4 ++- giambio/sync.py | 74 +++++++++++++++++++++++++++++++++++++++---- giambio/util/debug.py | 62 ++++++++++++++++++------------------ 3 files changed, 102 insertions(+), 38 deletions(-) diff --git a/giambio/__init__.py b/giambio/__init__.py index 02c3062..520db85 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue, MemoryChannel +from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -34,6 +34,8 @@ __all__ = [ "sleep", "Event", "Queue", + "Channel", + "NetworkChannel", "MemoryChannel", "run", "clock", diff --git a/giambio/sync.py b/giambio/sync.py index cc7191c..5641d86 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from abc import ABC, abstractmethod +import abc from collections import deque from typing import Any, Optional from giambio.traps import event_wait, event_set @@ -145,7 +147,60 @@ class Queue: self.putters.clear() -class MemoryChannel: +class Channel(ABC): + """ + A generic, two-way, full-duplex communication channel between + tasks. This is just an abstract base class! + """ + + def __init__(self, maxsize: Optional[int] = None): + """ + Public object constructor + """ + + self.maxsize = maxsize + self.closed = False + + @abstractmethod + async def write(self, data: str): + """ + Writes data to the channel. Blocks if the internal + queue is full until a spot is available. Does nothing + if the channel has been closed + """ + + return NotImplemented + + @abstractmethod + async def read(self): + """ + Reads data from the channel. Blocks until + a message arrives or returns immediately if + one is already waiting + """ + + return NotImplemented + + @abstractmethod + async def close(self): + """ + Closes the memory channel. Any underlying + data is left for other tasks to read + """ + + return NotImplemented + + @abstractmethod + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + return NotImplemented + + +class MemoryChannel(Channel): """ A two-way communication channel between tasks based on giambio's queueing mechanism. Operations on this @@ -158,16 +213,16 @@ class MemoryChannel: Public object constructor """ + super().__init__(maxsize) # We use a queue as our buffer self.buffer = Queue(maxsize=maxsize) - self.maxsize = maxsize - self.closed = False async def write(self, data: str): """ Writes data to the channel. Blocks if the internal - queue is full until a spot is available + queue is full until a spot is available. Does nothing + if the channel has been closed """ if self.closed: @@ -186,7 +241,7 @@ class MemoryChannel: async def close(self): """ Closes the memory channel. Any underlying - data is left for clients to read + data is left for other tasks to read """ self.closed = True @@ -197,4 +252,11 @@ class MemoryChannel: data to be read """ - return bool(len(self.buffer)) \ No newline at end of file + return bool(len(self.buffer)) + + +class NetworkChannel(Channel): + """ + A two-way communication channel between tasks based + on giambio's I/O mechanisms. This variant of a channel + """ diff --git a/giambio/util/debug.py b/giambio/util/debug.py index f911e20..2bd903d 100644 --- a/giambio/util/debug.py +++ b/giambio/util/debug.py @@ -31,7 +31,7 @@ class BaseDebugger(ABC): loop starts executing """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_exit(self): @@ -40,7 +40,7 @@ class BaseDebugger(ABC): loop exits entirely (all tasks completed) """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_schedule(self, task: Task, delay: float): @@ -49,14 +49,14 @@ class BaseDebugger(ABC): scheduled (not spawned) :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param delay: The delay, in seconds, after which - the task will start executing + the task will start executing :type delay: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_spawn(self, task: Task): @@ -65,11 +65,11 @@ class BaseDebugger(ABC): spawned :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_exit(self, task: Task): @@ -77,11 +77,11 @@ class BaseDebugger(ABC): This method is called when a task exits :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_task_step(self, task: Task): @@ -90,11 +90,11 @@ class BaseDebugger(ABC): calling a task's run() method :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_task_step(self, task: Task): @@ -103,11 +103,11 @@ class BaseDebugger(ABC): calling a task's run() method :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_sleep(self, task: Task, seconds: float): @@ -116,14 +116,14 @@ class BaseDebugger(ABC): to sleep :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param seconds: The amount of seconds the - task wants to sleep + task wants to sleep :type seconds: int """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_sleep(self, task: Task, seconds: float): @@ -132,14 +132,14 @@ class BaseDebugger(ABC): awakes from sleeping :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param seconds: The amount of seconds the - task actually slept + task actually slept :type seconds: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_io(self, timeout: float): @@ -148,12 +148,12 @@ class BaseDebugger(ABC): the event loop checks for I/O events :param timeout: The max. amount of seconds - that the loop will hang when using the select() - system call + that the loop will hang when using the select() + system call :type timeout: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_io(self, timeout: float): @@ -162,12 +162,12 @@ class BaseDebugger(ABC): the event loop has checked for I/O events :param timeout: The actual amount of seconds - that the loop has hung when using the select() - system call + that the loop has hung when using the select() + system call :type timeout: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_cancel(self, task: Task): @@ -176,11 +176,11 @@ class BaseDebugger(ABC): gets cancelled :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_cancel(self, task: Task) -> object: @@ -189,11 +189,11 @@ class BaseDebugger(ABC): gets cancelled :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_exception_raised(self, task: Task, exc: BaseException): @@ -202,10 +202,10 @@ class BaseDebugger(ABC): has raised an exception :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param exc: The exception that was raised :type exc: BaseException """ - raise NotImplementedError + return NotImplemented From ed6aba490fd087c27e3e4c2b7785862e897a6b74 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 18:14:12 +0100 Subject: [PATCH 7/9] Added two-way proxy example stolen from njsmith and fixed bug with io_release_task being fucking dumb --- giambio/context.py | 10 ++++++--- giambio/core.py | 41 +++++++++++++++++---------------- giambio/io.py | 2 ++ tests/proxy.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 22 deletions(-) create mode 100644 tests/proxy.py diff --git a/giambio/context.py b/giambio/context.py index f610bf4..c6fdd21 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -16,6 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. """ +from lib2to3.pgen2.token import OP import types import giambio from typing import List, Optional @@ -54,6 +55,7 @@ class TaskManager: self._proper_init = False self.enclosed_pool: Optional["giambio.context.TaskManager"] = None self.raise_on_timeout: bool = raise_on_timeout + self.entry_point: Optional[Task] = None async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task": """ @@ -70,6 +72,7 @@ class TaskManager: """ self._proper_init = True + self.entry_point = await giambio.traps.current_task() return self async def __aexit__(self, exc_type: Exception, exc: Exception, tb): @@ -89,8 +92,9 @@ class TaskManager: if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: return True except giambio.exceptions.TooSlowError: - return True - + if not self.raise_on_timeout: + raise + async def cancel(self): """ Cancels the pool entirely, iterating over all @@ -108,4 +112,4 @@ class TaskManager: pool have exited, False otherwise """ - return self._proper_init and all([task.done() for task in self.tasks]) + return self._proper_init and all([task.done() for task in self.tasks]) and (True if not self.enclosed_pool else self.enclosed_pool.done()) diff --git a/giambio/core.py b/giambio/core.py index 58058fb..d2de041 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -341,7 +341,7 @@ class AsyncScheduler: if self.selector.get_map(): for k in filter( - lambda o: o.data == self.current_task, + lambda o: o.data == task, dict(self.selector.get_map()).values(), ): self.io_release(k.fileobj) @@ -358,7 +358,7 @@ class AsyncScheduler: before it's due """ - if self.current_task.last_io: + if self.current_task.last_io or self.current_task.status == "io": self.io_release_task(self.current_task) self.current_task.status = "sleep" self.suspended.append(self.current_task) @@ -441,13 +441,11 @@ class AsyncScheduler: while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): pool = self.deadlines.get() pool.timed_out = True - if not pool.tasks and self.current_task is self.entry_point: - self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) + self.cancel_pool(pool) for task in pool.tasks: - if not task.done(): - self.paused.discard(task) - self.io_release_task(task) - self.handle_task_exit(task, partial(task.throw, TooSlowError(task))) + self.join(task) + self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) + def schedule_tasks(self, tasks: List[Task]): """ @@ -554,9 +552,12 @@ class AsyncScheduler: self.run_ready.append(entry) self.debugger.on_start() if loop: - self.run() - self.has_ran = True - self.debugger.on_exit() + try: + self.run() + finally: + self.has_ran = True + self.close() + self.debugger.on_exit() def cancel_pool(self, pool: TaskManager) -> bool: """ @@ -729,8 +730,8 @@ class AsyncScheduler: self.io_release_task(task) elif task.status == "sleep": self.paused.discard(task) - if task in self.suspended: - self.suspended.remove(task) + if task in self.suspended: + self.suspended.remove(task) try: self.do_cancel(task) except CancelledError as cancel: @@ -747,7 +748,6 @@ class AsyncScheduler: task.cancel_pending = False task.cancelled = True task.status = "cancelled" - self.io_release_task(self.current_task) self.debugger.after_cancel(task) self.tasks.remove(task) else: @@ -758,12 +758,12 @@ class AsyncScheduler: def register_sock(self, sock, evt_type: str): """ Registers the given socket inside the - selector to perform I/0 multiplexing + selector to perform I/O multiplexing :param sock: The socket on which a read or write operation - has to be performed + has to be performed :param evt_type: The type of event to perform on the given - socket, either "read" or "write" + socket, either "read" or "write" :type evt_type: str """ @@ -797,5 +797,8 @@ class AsyncScheduler: try: self.selector.register(sock, evt, self.current_task) except KeyError: - # The socket is already registered doing something else - raise ResourceBusy("The given socket is being read/written by another task") from None + # The socket is already registered doing something else, we + # modify the socket instead (or maybe not?) + self.selector.modify(sock, evt, self.current_task) + # TODO: Does this break stuff? + # raise ResourceBusy("The given socket is being read/written by another task") from None diff --git a/giambio/io.py b/giambio/io.py index a5cb425..95ff009 100644 --- a/giambio/io.py +++ b/giambio/io.py @@ -16,6 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. """ +import giambio from giambio.exceptions import ResourceClosed from giambio.traps import want_write, want_read, io_release @@ -121,6 +122,7 @@ class AsyncSocket: if self.sock: self.sock.shutdown(how) + await giambio.sleep(0) # Checkpoint async def bind(self, addr: tuple): """ diff --git a/tests/proxy.py b/tests/proxy.py new file mode 100644 index 0000000..8d5ff8d --- /dev/null +++ b/tests/proxy.py @@ -0,0 +1,56 @@ +from debugger import Debugger +import giambio +import socket + + +async def proxy_one_way(source: giambio.socket.AsyncSocket, sink: giambio.socket.AsyncSocket): + """ + Sends data from source to sink + """ + + sink_addr = ":".join(map(str, await sink.getpeername())) + source_addr = ":".join(map(str, await source.getpeername())) + while True: + data = await source.receive(1024) + if not data: + print(f"{source_addr} has exited, closing connection to {sink_addr}") + await sink.shutdown(socket.SHUT_WR) + break + print(f"Got {data.decode('utf8', errors='ignore')!r} from {source_addr}, forwarding it to {sink_addr}") + await sink.send_all(data) + + +async def proxy_two_way(a: giambio.socket.AsyncSocket, b: giambio.socket.AsyncSocket): + """ + Sets up a two-way proxy from a to b and from b to a + """ + + async with giambio.create_pool() as pool: + await pool.spawn(proxy_one_way, a, b) + await pool.spawn(proxy_one_way, b, a) + + +async def main(delay: int, a: tuple, b: tuple): + """ + Sets up the proxy + """ + + start = giambio.clock() + print(f"Starting two-way proxy from {a[0]}:{a[1]} to {b[0]}:{b[1]}, lasting for {delay} seconds") + async with giambio.skip_after(delay) as p: + sock_a = giambio.socket.socket() + sock_b = giambio.socket.socket() + await sock_a.connect(a) + await sock_b.connect(b) + async with sock_a, sock_b: + await proxy_two_way(sock_a, sock_b) + print(f"Proxy has exited after {giambio.clock() - start:.2f} seconds") + + +try: + giambio.run(main, 60, ("localhost", 12345), ("localhost", 54321), debugger=()) +except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! + if isinstance(error, KeyboardInterrupt): + print("Ctrl+C detected, exiting") + else: + print(f"Exiting due to a {type(error).__name__}: {error}") \ No newline at end of file From e76998f29fdee9ae8892773fe2b9a8d169e0b3b0 Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Sun, 27 Feb 2022 18:50:57 +0100 Subject: [PATCH 8/9] Added experimental network channels --- giambio/context.py | 2 +- giambio/core.py | 3 +- giambio/sync.py | 69 ++++++++++++++++++++++++++++++++++++++++++++-- tests/task_ipc2.py | 32 +++++++++++++++++++++ 4 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 tests/task_ipc2.py diff --git a/giambio/context.py b/giambio/context.py index c6fdd21..9fe7cfa 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -92,7 +92,7 @@ class TaskManager: if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: return True except giambio.exceptions.TooSlowError: - if not self.raise_on_timeout: + if self.raise_on_timeout: raise async def cancel(self): diff --git a/giambio/core.py b/giambio/core.py index d2de041..48fb734 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -445,7 +445,8 @@ class AsyncScheduler: for task in pool.tasks: self.join(task) self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) - + if pool.entry_point is self.entry_point: + self.run_ready.append(self.entry_point) def schedule_tasks(self, tasks: List[Task]): """ diff --git a/giambio/sync.py b/giambio/sync.py index 5641d86..74a0163 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,12 +15,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from socket import socketpair from abc import ABC, abstractmethod -import abc from collections import deque from typing import Any, Optional from giambio.traps import event_wait, event_set from giambio.exceptions import GiambioError +from giambio.socket import wrap_socket class Event: @@ -258,5 +259,69 @@ class MemoryChannel(Channel): class NetworkChannel(Channel): """ A two-way communication channel between tasks based - on giambio's I/O mechanisms. This variant of a channel + on giambio's I/O mechanisms instead of in-memory queues """ + + def __init__(self): + """ + Public object constructor + """ + + super().__init__(None) + # We use a socket as our buffer instead of a queue + sockets = socketpair() + self.reader = wrap_socket(sockets[0]) + self.writer = wrap_socket(sockets[1]) + self._internal_buffer = b"" + + + async def write(self, data: bytes): + """ + Writes data to the channel. Blocks if the internal + socket is not currently available. Does nothing + if the channel has been closed + """ + + if self.closed: + return + await self.writer.send_all(data) + + async def read(self, size: int): + """ + Reads exactly size bytes from the channel. Blocks until + enough data arrives. Extra data is cached and used on the + next read + """ + + data = self._internal_buffer + while len(data) < size: + data += await self.reader.receive(size) + self._internal_buffer = data[size:] + data = data[:size] + return data + + async def close(self): + """ + Closes the memory channel. Any underlying + data is flushed out of the internal socket + and is lost + """ + + self.closed = True + await self.reader.close() + await self.writer.close() + + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + # TODO: Ugly! + if self.closed: + return False + try: + self._internal_buffer += self.reader.sock.recv(1) + except BlockingIOError: + return False + return True diff --git a/tests/task_ipc2.py b/tests/task_ipc2.py new file mode 100644 index 0000000..673d227 --- /dev/null +++ b/tests/task_ipc2.py @@ -0,0 +1,32 @@ +import random +import string +import giambio +from debugger import Debugger + + +async def task(c: giambio.NetworkChannel, name: str): + while True: + if await c.pending(): + print(f"[{name}] Received {(await c.read(8)).decode()!r}") + else: + data = "".join(random.choice(string.ascii_letters) for _ in range(8)) + print(f"[{name}] Sending {data!r}") + await c.write(data.encode()) + await giambio.sleep(1) + + +async def main(channel: giambio.NetworkChannel, delay: int): + print(f"[main] Spawning workers, exiting in {delay} seconds") + async with giambio.skip_after(delay) as pool: + await pool.spawn(task, channel, "one") + await pool.spawn(task, channel, "two") + await channel.close() + print(f"[main] Operation complete, channel closed") + if await channel.pending(): + print(f"[main] Channel has leftover data, clearing it") + while await channel.pending(): + print(f"[main] Cleared {await channel.read(1)!r}") + + +channel = giambio.NetworkChannel() +giambio.run(main, channel, 4, debugger=()) \ No newline at end of file From ad34be87547aef657361b2b932b653e19ed4130d Mon Sep 17 00:00:00 2001 From: Nocturn9x Date: Tue, 10 May 2022 11:56:47 +0200 Subject: [PATCH 9/9] Added locks --- giambio/__init__.py | 3 +- giambio/core.py | 28 +++++++++++++++---- giambio/runtime.py | 24 ++++++++-------- giambio/sync.py | 67 +++++++++++++++++++++++++++++++++++++++++++-- giambio/traps.py | 2 +- tests/queue.py | 12 ++++++-- tests/task_ipc.py | 1 + tests/timeout2.py | 8 +++--- 8 files changed, 116 insertions(+), 29 deletions(-) diff --git a/giambio/__init__.py b/giambio/__init__.py index 520db85..71989a5 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel +from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel, Lock from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -37,6 +37,7 @@ __all__ = [ "Channel", "NetworkChannel", "MemoryChannel", + "Lock", "run", "clock", "create_pool", diff --git a/giambio/core.py b/giambio/core.py index 48fb734..73270ec 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -130,6 +130,7 @@ class AsyncScheduler: self.entry_point: Optional[Task] = None # Suspended tasks self.suspended: deque = deque() + def __repr__(self): """ @@ -209,7 +210,10 @@ class AsyncScheduler: # after it is set, but it makes the implementation easier if not self.current_pool and self.current_task.pool: self.current_pool = self.current_task.pool - self.deadlines.put(self.current_pool) + pool = self.current_pool + while pool: + self.deadlines.put(pool) + pool = self.current_pool.enclosed_pool # If there are no actively running tasks, we start by # checking for I/O. This method will wait for I/O until # the closest deadline to avoid starving sleeping tasks @@ -288,16 +292,18 @@ class AsyncScheduler: account, that's self.run's job! """ - data = None # Sets the currently running task self.current_task = self.run_ready.popleft() if self.current_task.done(): # We need to make sure we don't try to execute # exited tasks that are on the running queue return - if not self.current_pool and self.current_task.pool: + if not self.current_pool: self.current_pool = self.current_task.pool - self.deadlines.put(self.current_pool) + pool = self.current_pool + while pool: + self.deadlines.put(pool) + pool = self.current_pool.enclosed_pool self.debugger.before_task_step(self.current_task) # Some debugging and internal chatter here self.current_task.status = "run" @@ -363,7 +369,6 @@ class AsyncScheduler: self.current_task.status = "sleep" self.suspended.append(self.current_task) - def reschedule_running(self): """ Reschedules the currently running task @@ -444,8 +449,8 @@ class AsyncScheduler: self.cancel_pool(pool) for task in pool.tasks: self.join(task) - self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) if pool.entry_point is self.entry_point: + self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) self.run_ready.append(self.entry_point) def schedule_tasks(self, tasks: List[Task]): @@ -480,6 +485,7 @@ class AsyncScheduler: self.run_ready.append(task) self.debugger.after_sleep(task, slept) + def get_closest_deadline(self) -> float: """ Gets the closest expiration deadline (asleep tasks, timeouts) @@ -619,6 +625,16 @@ class AsyncScheduler: elif not self.done(): raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() + # We reset the event loop's state + self.tasks = [] + self.entry_point = None + self.current_pool = None + self.current_task = None + self.paused = TimeQueue(self.clock) + self.deadlines = DeadlinesQueue() + self.run_ready = deque() + self.suspended = deque() + def reschedule_joiners(self, task: Task): """ diff --git a/giambio/runtime.py b/giambio/runtime.py index bf14e28..b9217ad 100644 --- a/giambio/runtime.py +++ b/giambio/runtime.py @@ -95,19 +95,19 @@ def create_pool(): return TaskManager() + def with_timeout(timeout: int or float): """ Creates an async pool with an associated timeout """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout) loop = get_event_loop() - if loop.current_task.pool is None: - loop.current_pool = mgr - loop.current_task.pool = mgr - loop.current_task.next_deadline = mgr.timeout or 0.0 - loop.deadlines.put(mgr) + mgr = TaskManager(timeout) + if loop.current_task is not loop.entry_point: + mgr.tasks.append(loop.current_task) + if loop.current_pool and loop.current_pool is not mgr: + loop.current_pool.enclosed_pool = mgr return mgr @@ -119,11 +119,11 @@ def skip_after(timeout: int or float): """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout, False) loop = get_event_loop() - if loop.current_task.pool is None: - loop.current_pool = mgr - loop.current_task.pool = mgr - loop.current_task.next_deadline = mgr.timeout or 0.0 - loop.deadlines.put(mgr) + mgr = TaskManager(timeout, False) + if loop.current_task is not loop.entry_point: + mgr.tasks.append(loop.current_task) + if loop.current_pool and loop.current_pool is not mgr: + loop.current_pool.enclosed_pool = mgr return mgr + diff --git a/giambio/sync.py b/giambio/sync.py index 74a0163..ee1f600 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -19,9 +19,10 @@ from socket import socketpair from abc import ABC, abstractmethod from collections import deque from typing import Any, Optional -from giambio.traps import event_wait, event_set +from giambio.traps import event_wait, event_set, current_task from giambio.exceptions import GiambioError from giambio.socket import wrap_socket +from giambio.task import Task class Event: @@ -72,7 +73,11 @@ class Queue: """ self.maxsize = maxsize + # Stores event objects for tasks wanting to + # get items from the queue self.getters = deque() + # Stores event objects for tasks wanting to + # put items on the queue self.putters = deque() self.container = deque() @@ -84,16 +89,19 @@ class Queue: return len(self.container) + def __repr__(self) -> str: + return f"{type(self).__name__}({f', '.join(map(str, self.container))})" + async def __aiter__(self): """ - Implements the iterator protocol + Implements the asynchronous iterator protocol """ return self async def __anext__(self): """ - Implements the iterator protocol + Implements the asynchronous iterator protocol """ return await self.get() @@ -325,3 +333,56 @@ class NetworkChannel(Channel): except BlockingIOError: return False return True + + +class Lock: + """ + A simple single-owner lock + """ + + def __init__(self): + """ + Public constructor + """ + + self.owner: Optional[Task] = None + self.tasks: deque[Event] = deque() + + async def acquire(self): + """ + Acquires the lock + """ + + task = await current_task() + if self.owner is None: + self.owner = task + elif task is self.owner: + raise RuntimeError("lock is already acquired by current task") + elif self.owner is not task: + self.tasks.append(Event()) + await self.tasks[-1].wait() + self.owner = task + + async def release(self): + """ + Releases the lock + """ + + task = await current_task() + if self.owner is None: + raise RuntimeError("lock is not acquired") + elif self.owner is not task: + raise RuntimeError("lock can only released by its owner") + elif self.tasks: + await self.tasks.popleft().trigger() + else: + self.owner = None + + + async def __aenter__(self): + await self.acquire() + return self + + + async def __aexit__(self, *args): + await self.release() diff --git a/giambio/traps.py b/giambio/traps.py index 2e1e8f6..f7dd8bf 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -209,4 +209,4 @@ async def event_set(event, value): loop = await current_loop() for waiter in event.waiters: loop._data[waiter] = event.value - await schedule_tasks(event.waiters) + await schedule_tasks(event.waiters) \ No newline at end of file diff --git a/tests/queue.py b/tests/queue.py index afe761a..f1476dc 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -4,6 +4,9 @@ from debugger import Debugger async def producer(q: giambio.Queue, n: int): for i in range(n): + # This will wait until the + # queue is emptied by the + # consumer await q.put(i) print(f"Produced {i}") await q.put(None) @@ -12,11 +15,16 @@ async def producer(q: giambio.Queue, n: int): async def consumer(q: giambio.Queue): while True: + # Hangs until there is + # something on the queue item = await q.get() if item is None: print("Consumer done") break print(f"Consumed {item}") + # Simulates some work so the + # producer waits before putting + # the next value await giambio.sleep(1) @@ -24,8 +32,8 @@ async def main(q: giambio.Queue, n: int): async with giambio.create_pool() as pool: await pool.spawn(producer, q, n) await pool.spawn(consumer, q) - + print("Bye!") -queue = giambio.Queue() +queue = giambio.Queue(1) # Queue has size limit of 1 giambio.run(main, queue, 5, debugger=()) diff --git a/tests/task_ipc.py b/tests/task_ipc.py index 87faf48..64383fa 100644 --- a/tests/task_ipc.py +++ b/tests/task_ipc.py @@ -1,3 +1,4 @@ +## SImple task IPC using giambio's MemoryChannel class import random import string import giambio diff --git a/tests/timeout2.py b/tests/timeout2.py index 42d75fe..b26f2bc 100644 --- a/tests/timeout2.py +++ b/tests/timeout2.py @@ -11,11 +11,11 @@ async def child(name: int): async def main(): start = giambio.clock() async with giambio.skip_after(10) as pool: - await pool.spawn(child, 7) # This will complete - await giambio.sleep(2) # This will make the code below wait 2 seconds + await pool.spawn(child, 7) # This will complete + await giambio.sleep(2) # This will make the code below wait 2 seconds await pool.spawn(child, 15) # This will not complete - await giambio.sleep(50) - await child(20) # Neither will this + await giambio.sleep(50) # Neither will this + await child(20) # Nor this if pool.timed_out: print("[main] One or more children have timed out!") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")