Minor changes

This commit is contained in:
Nocturn9x 2022-02-27 14:01:07 +01:00
parent 800a173aa6
commit b8ee9945c1
3 changed files with 102 additions and 38 deletions

View File

@ -22,7 +22,7 @@ __version__ = (0, 0, 1)
from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task
from giambio.sync import Event, Queue, MemoryChannel
from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug
@ -34,6 +34,8 @@ __all__ = [
"sleep",
"Event",
"Queue",
"Channel",
"NetworkChannel",
"MemoryChannel",
"run",
"clock",

View File

@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from abc import ABC, abstractmethod
import abc
from collections import deque
from typing import Any, Optional
from giambio.traps import event_wait, event_set
@ -145,7 +147,60 @@ class Queue:
self.putters.clear()
class MemoryChannel:
class Channel(ABC):
"""
A generic, two-way, full-duplex communication channel between
tasks. This is just an abstract base class!
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
self.maxsize = maxsize
self.closed = False
@abstractmethod
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
return NotImplemented
@abstractmethod
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return NotImplemented
@abstractmethod
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for other tasks to read
"""
return NotImplemented
@abstractmethod
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return NotImplemented
class MemoryChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's queueing mechanism. Operations on this
@ -158,16 +213,16 @@ class MemoryChannel:
Public object constructor
"""
super().__init__(maxsize)
# We use a queue as our buffer
self.buffer = Queue(maxsize=maxsize)
self.maxsize = maxsize
self.closed = False
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
if self.closed:
@ -186,7 +241,7 @@ class MemoryChannel:
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for clients to read
data is left for other tasks to read
"""
self.closed = True
@ -197,4 +252,11 @@ class MemoryChannel:
data to be read
"""
return bool(len(self.buffer))
return bool(len(self.buffer))
class NetworkChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's I/O mechanisms. This variant of a channel
"""

View File

@ -31,7 +31,7 @@ class BaseDebugger(ABC):
loop starts executing
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exit(self):
@ -40,7 +40,7 @@ class BaseDebugger(ABC):
loop exits entirely (all tasks completed)
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_schedule(self, task: Task, delay: float):
@ -49,14 +49,14 @@ class BaseDebugger(ABC):
scheduled (not spawned)
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param delay: The delay, in seconds, after which
the task will start executing
the task will start executing
:type delay: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_spawn(self, task: Task):
@ -65,11 +65,11 @@ class BaseDebugger(ABC):
spawned
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_exit(self, task: Task):
@ -77,11 +77,11 @@ class BaseDebugger(ABC):
This method is called when a task exits
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_task_step(self, task: Task):
@ -90,11 +90,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_task_step(self, task: Task):
@ -103,11 +103,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_sleep(self, task: Task, seconds: float):
@ -116,14 +116,14 @@ class BaseDebugger(ABC):
to sleep
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task wants to sleep
task wants to sleep
:type seconds: int
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_sleep(self, task: Task, seconds: float):
@ -132,14 +132,14 @@ class BaseDebugger(ABC):
awakes from sleeping
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task actually slept
task actually slept
:type seconds: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_io(self, timeout: float):
@ -148,12 +148,12 @@ class BaseDebugger(ABC):
the event loop checks for I/O events
:param timeout: The max. amount of seconds
that the loop will hang when using the select()
system call
that the loop will hang when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_io(self, timeout: float):
@ -162,12 +162,12 @@ class BaseDebugger(ABC):
the event loop has checked for I/O events
:param timeout: The actual amount of seconds
that the loop has hung when using the select()
system call
that the loop has hung when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_cancel(self, task: Task):
@ -176,11 +176,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_cancel(self, task: Task) -> object:
@ -189,11 +189,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exception_raised(self, task: Task, exc: BaseException):
@ -202,10 +202,10 @@ class BaseDebugger(ABC):
has raised an exception
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param exc: The exception that was raised
:type exc: BaseException
"""
raise NotImplementedError
return NotImplemented