diff --git a/giambio/__init__.py b/giambio/__init__.py index 02c3062..520db85 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -22,7 +22,7 @@ __version__ = (0, 0, 1) from giambio import exceptions, socket, context, core, task, io from giambio.traps import sleep, current_task -from giambio.sync import Event, Queue, MemoryChannel +from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.util import debug @@ -34,6 +34,8 @@ __all__ = [ "sleep", "Event", "Queue", + "Channel", + "NetworkChannel", "MemoryChannel", "run", "clock", diff --git a/giambio/sync.py b/giambio/sync.py index cc7191c..5641d86 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from abc import ABC, abstractmethod +import abc from collections import deque from typing import Any, Optional from giambio.traps import event_wait, event_set @@ -145,7 +147,60 @@ class Queue: self.putters.clear() -class MemoryChannel: +class Channel(ABC): + """ + A generic, two-way, full-duplex communication channel between + tasks. This is just an abstract base class! + """ + + def __init__(self, maxsize: Optional[int] = None): + """ + Public object constructor + """ + + self.maxsize = maxsize + self.closed = False + + @abstractmethod + async def write(self, data: str): + """ + Writes data to the channel. Blocks if the internal + queue is full until a spot is available. Does nothing + if the channel has been closed + """ + + return NotImplemented + + @abstractmethod + async def read(self): + """ + Reads data from the channel. Blocks until + a message arrives or returns immediately if + one is already waiting + """ + + return NotImplemented + + @abstractmethod + async def close(self): + """ + Closes the memory channel. Any underlying + data is left for other tasks to read + """ + + return NotImplemented + + @abstractmethod + async def pending(self): + """ + Returns if there's pending + data to be read + """ + + return NotImplemented + + +class MemoryChannel(Channel): """ A two-way communication channel between tasks based on giambio's queueing mechanism. Operations on this @@ -158,16 +213,16 @@ class MemoryChannel: Public object constructor """ + super().__init__(maxsize) # We use a queue as our buffer self.buffer = Queue(maxsize=maxsize) - self.maxsize = maxsize - self.closed = False async def write(self, data: str): """ Writes data to the channel. Blocks if the internal - queue is full until a spot is available + queue is full until a spot is available. Does nothing + if the channel has been closed """ if self.closed: @@ -186,7 +241,7 @@ class MemoryChannel: async def close(self): """ Closes the memory channel. Any underlying - data is left for clients to read + data is left for other tasks to read """ self.closed = True @@ -197,4 +252,11 @@ class MemoryChannel: data to be read """ - return bool(len(self.buffer)) \ No newline at end of file + return bool(len(self.buffer)) + + +class NetworkChannel(Channel): + """ + A two-way communication channel between tasks based + on giambio's I/O mechanisms. This variant of a channel + """ diff --git a/giambio/util/debug.py b/giambio/util/debug.py index f911e20..2bd903d 100644 --- a/giambio/util/debug.py +++ b/giambio/util/debug.py @@ -31,7 +31,7 @@ class BaseDebugger(ABC): loop starts executing """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_exit(self): @@ -40,7 +40,7 @@ class BaseDebugger(ABC): loop exits entirely (all tasks completed) """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_schedule(self, task: Task, delay: float): @@ -49,14 +49,14 @@ class BaseDebugger(ABC): scheduled (not spawned) :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param delay: The delay, in seconds, after which - the task will start executing + the task will start executing :type delay: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_spawn(self, task: Task): @@ -65,11 +65,11 @@ class BaseDebugger(ABC): spawned :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_task_exit(self, task: Task): @@ -77,11 +77,11 @@ class BaseDebugger(ABC): This method is called when a task exits :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_task_step(self, task: Task): @@ -90,11 +90,11 @@ class BaseDebugger(ABC): calling a task's run() method :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_task_step(self, task: Task): @@ -103,11 +103,11 @@ class BaseDebugger(ABC): calling a task's run() method :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_sleep(self, task: Task, seconds: float): @@ -116,14 +116,14 @@ class BaseDebugger(ABC): to sleep :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param seconds: The amount of seconds the - task wants to sleep + task wants to sleep :type seconds: int """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_sleep(self, task: Task, seconds: float): @@ -132,14 +132,14 @@ class BaseDebugger(ABC): awakes from sleeping :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param seconds: The amount of seconds the - task actually slept + task actually slept :type seconds: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_io(self, timeout: float): @@ -148,12 +148,12 @@ class BaseDebugger(ABC): the event loop checks for I/O events :param timeout: The max. amount of seconds - that the loop will hang when using the select() - system call + that the loop will hang when using the select() + system call :type timeout: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_io(self, timeout: float): @@ -162,12 +162,12 @@ class BaseDebugger(ABC): the event loop has checked for I/O events :param timeout: The actual amount of seconds - that the loop has hung when using the select() - system call + that the loop has hung when using the select() + system call :type timeout: float """ - raise NotImplementedError + return NotImplemented @abstractmethod def before_cancel(self, task: Task): @@ -176,11 +176,11 @@ class BaseDebugger(ABC): gets cancelled :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def after_cancel(self, task: Task) -> object: @@ -189,11 +189,11 @@ class BaseDebugger(ABC): gets cancelled :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task """ - raise NotImplementedError + return NotImplemented @abstractmethod def on_exception_raised(self, task: Task, exc: BaseException): @@ -202,10 +202,10 @@ class BaseDebugger(ABC): has raised an exception :param task: The Task object representing a - giambio Task and wrapping a coroutine + giambio Task and wrapping a coroutine :type task: :class: giambio.objects.Task :param exc: The exception that was raised :type exc: BaseException """ - raise NotImplementedError + return NotImplemented