Compare commits

...

5 Commits

Author SHA1 Message Date
Nocturn9x e76998f29f Added experimental network channels 2022-02-27 18:50:57 +01:00
Nocturn9x ed6aba490f Added two-way proxy example stolen from njsmith and fixed bug with io_release_task being fucking dumb 2022-02-27 18:14:12 +01:00
Nocturn9x b8ee9945c1 Minor changes 2022-02-27 14:01:07 +01:00
Nocturn9x 800a173aa6 Minor change to task_ipc test 2022-02-27 13:42:25 +01:00
Nocturn9x 3ec0864734 Added experimental memory channels 2022-02-27 13:39:32 +01:00
12 changed files with 456 additions and 65 deletions

View File

@ -22,7 +22,7 @@ __version__ = (0, 0, 1)
from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task
from giambio.sync import Event, Queue
from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug
@ -34,6 +34,9 @@ __all__ = [
"sleep",
"Event",
"Queue",
"Channel",
"NetworkChannel",
"MemoryChannel",
"run",
"clock",
"create_pool",

View File

@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from lib2to3.pgen2.token import OP
import types
import giambio
from typing import List, Optional
@ -54,6 +55,7 @@ class TaskManager:
self._proper_init = False
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
self.raise_on_timeout: bool = raise_on_timeout
self.entry_point: Optional[Task] = None
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
"""
@ -70,6 +72,7 @@ class TaskManager:
"""
self._proper_init = True
self.entry_point = await giambio.traps.current_task()
return self
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
@ -78,16 +81,20 @@ class TaskManager:
all the tasks spawned inside the pool
"""
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
return True
try:
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
return True
except giambio.exceptions.TooSlowError:
if self.raise_on_timeout:
raise
async def cancel(self):
"""
Cancels the pool entirely, iterating over all
@ -105,4 +112,4 @@ class TaskManager:
pool have exited, False otherwise
"""
return self._proper_init and all([task.done() for task in self.tasks])
return self._proper_init and all([task.done() for task in self.tasks]) and (True if not self.enclosed_pool else self.enclosed_pool.done())

View File

@ -341,7 +341,7 @@ class AsyncScheduler:
if self.selector.get_map():
for k in filter(
lambda o: o.data == self.current_task,
lambda o: o.data == task,
dict(self.selector.get_map()).values(),
):
self.io_release(k.fileobj)
@ -358,8 +358,9 @@ class AsyncScheduler:
before it's due
"""
if self.current_task.last_io:
if self.current_task.last_io or self.current_task.status == "io":
self.io_release_task(self.current_task)
self.current_task.status = "sleep"
self.suspended.append(self.current_task)
@ -440,13 +441,12 @@ class AsyncScheduler:
while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock():
pool = self.deadlines.get()
pool.timed_out = True
if not pool.tasks and self.current_task is self.entry_point:
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
self.cancel_pool(pool)
for task in pool.tasks:
if not task.done():
self.paused.discard(task)
self.io_release_task(task)
self.handle_task_exit(task, partial(task.throw, TooSlowError(task)))
self.join(task)
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
if pool.entry_point is self.entry_point:
self.run_ready.append(self.entry_point)
def schedule_tasks(self, tasks: List[Task]):
"""
@ -553,9 +553,12 @@ class AsyncScheduler:
self.run_ready.append(entry)
self.debugger.on_start()
if loop:
self.run()
self.has_ran = True
self.debugger.on_exit()
try:
self.run()
finally:
self.has_ran = True
self.close()
self.debugger.on_exit()
def cancel_pool(self, pool: TaskManager) -> bool:
"""
@ -728,6 +731,8 @@ class AsyncScheduler:
self.io_release_task(task)
elif task.status == "sleep":
self.paused.discard(task)
if task in self.suspended:
self.suspended.remove(task)
try:
self.do_cancel(task)
except CancelledError as cancel:
@ -744,7 +749,6 @@ class AsyncScheduler:
task.cancel_pending = False
task.cancelled = True
task.status = "cancelled"
self.io_release_task(self.current_task)
self.debugger.after_cancel(task)
self.tasks.remove(task)
else:
@ -755,12 +759,12 @@ class AsyncScheduler:
def register_sock(self, sock, evt_type: str):
"""
Registers the given socket inside the
selector to perform I/0 multiplexing
selector to perform I/O multiplexing
:param sock: The socket on which a read or write operation
has to be performed
has to be performed
:param evt_type: The type of event to perform on the given
socket, either "read" or "write"
socket, either "read" or "write"
:type evt_type: str
"""
@ -794,5 +798,8 @@ class AsyncScheduler:
try:
self.selector.register(sock, evt, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy("The given socket is being read/written by another task") from None
# The socket is already registered doing something else, we
# modify the socket instead (or maybe not?)
self.selector.modify(sock, evt, self.current_task)
# TODO: Does this break stuff?
# raise ResourceBusy("The given socket is being read/written by another task") from None

View File

@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import giambio
from giambio.exceptions import ResourceClosed
from giambio.traps import want_write, want_read, io_release
@ -121,6 +122,7 @@ class AsyncSocket:
if self.sock:
self.sock.shutdown(how)
await giambio.sleep(0) # Checkpoint
async def bind(self, addr: tuple):
"""

View File

@ -15,10 +15,13 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from socket import socketpair
from abc import ABC, abstractmethod
from collections import deque
from typing import Any, Optional
from giambio.traps import event_wait, event_set
from giambio.exceptions import GiambioError
from giambio.socket import wrap_socket
class Event:
@ -45,7 +48,7 @@ class Event:
if self.set:
raise GiambioError("The event has already been set")
self.value = value
await event_set(self)
await event_set(self, value)
async def wait(self):
"""
@ -73,6 +76,27 @@ class Queue:
self.putters = deque()
self.container = deque()
def __len__(self):
"""
Returns the length of the queue
"""
return len(self.container)
async def __aiter__(self):
"""
Implements the iterator protocol
"""
return self
async def __anext__(self):
"""
Implements the iterator protocol
"""
return await self.get()
async def put(self, item: Any):
"""
@ -81,7 +105,7 @@ class Queue:
enough space for the queue
"""
if self.maxsize and len(self.container) < self.maxsize:
if not self.maxsize or len(self.container) < self.maxsize:
self.container.append(item)
if self.getters:
await self.getters.popleft().trigger(self.container.popleft())
@ -106,3 +130,198 @@ class Queue:
ev = Event()
self.getters.append(ev)
return await ev.wait()
async def clear(self):
"""
Clears the queue
"""
self.container.clear()
async def reset(self):
"""
Resets the queue
"""
await self.clear()
self.getters.clear()
self.putters.clear()
class Channel(ABC):
"""
A generic, two-way, full-duplex communication channel between
tasks. This is just an abstract base class!
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
self.maxsize = maxsize
self.closed = False
@abstractmethod
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
return NotImplemented
@abstractmethod
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return NotImplemented
@abstractmethod
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for other tasks to read
"""
return NotImplemented
@abstractmethod
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return NotImplemented
class MemoryChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's queueing mechanism. Operations on this
object do not perform any I/O or other system call and
are therefore extremely efficient
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
super().__init__(maxsize)
# We use a queue as our buffer
self.buffer = Queue(maxsize=maxsize)
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
if self.closed:
return
await self.buffer.put(data)
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return await self.buffer.get()
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for other tasks to read
"""
self.closed = True
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return bool(len(self.buffer))
class NetworkChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's I/O mechanisms instead of in-memory queues
"""
def __init__(self):
"""
Public object constructor
"""
super().__init__(None)
# We use a socket as our buffer instead of a queue
sockets = socketpair()
self.reader = wrap_socket(sockets[0])
self.writer = wrap_socket(sockets[1])
self._internal_buffer = b""
async def write(self, data: bytes):
"""
Writes data to the channel. Blocks if the internal
socket is not currently available. Does nothing
if the channel has been closed
"""
if self.closed:
return
await self.writer.send_all(data)
async def read(self, size: int):
"""
Reads exactly size bytes from the channel. Blocks until
enough data arrives. Extra data is cached and used on the
next read
"""
data = self._internal_buffer
while len(data) < size:
data += await self.reader.receive(size)
self._internal_buffer = data[size:]
data = data[:size]
return data
async def close(self):
"""
Closes the memory channel. Any underlying
data is flushed out of the internal socket
and is lost
"""
self.closed = True
await self.reader.close()
await self.writer.close()
async def pending(self):
"""
Returns if there's pending
data to be read
"""
# TODO: Ugly!
if self.closed:
return False
try:
self._internal_buffer += self.reader.sock.recv(1)
except BlockingIOError:
return False
return True

View File

@ -150,7 +150,7 @@ async def cancel(task):
"""
await create_trap("cancel", task)
assert task.cancelled, f"Task ignored CancelledError"
assert task.done(), f"Task ignored CancelledError"
async def want_read(stream):
@ -198,13 +198,14 @@ async def event_wait(event):
return await suspend()
async def event_set(event):
async def event_set(event, value):
"""
Sets the given event and reawakens its
waiters
"""
event.set = True
event.value = value
loop = await current_loop()
for waiter in event.waiters:
loop._data[waiter] = event.value

View File

@ -31,7 +31,7 @@ class BaseDebugger(ABC):
loop starts executing
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exit(self):
@ -40,7 +40,7 @@ class BaseDebugger(ABC):
loop exits entirely (all tasks completed)
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_schedule(self, task: Task, delay: float):
@ -49,14 +49,14 @@ class BaseDebugger(ABC):
scheduled (not spawned)
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param delay: The delay, in seconds, after which
the task will start executing
the task will start executing
:type delay: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_spawn(self, task: Task):
@ -65,11 +65,11 @@ class BaseDebugger(ABC):
spawned
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_exit(self, task: Task):
@ -77,11 +77,11 @@ class BaseDebugger(ABC):
This method is called when a task exits
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_task_step(self, task: Task):
@ -90,11 +90,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_task_step(self, task: Task):
@ -103,11 +103,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_sleep(self, task: Task, seconds: float):
@ -116,14 +116,14 @@ class BaseDebugger(ABC):
to sleep
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task wants to sleep
task wants to sleep
:type seconds: int
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_sleep(self, task: Task, seconds: float):
@ -132,14 +132,14 @@ class BaseDebugger(ABC):
awakes from sleeping
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task actually slept
task actually slept
:type seconds: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_io(self, timeout: float):
@ -148,12 +148,12 @@ class BaseDebugger(ABC):
the event loop checks for I/O events
:param timeout: The max. amount of seconds
that the loop will hang when using the select()
system call
that the loop will hang when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_io(self, timeout: float):
@ -162,12 +162,12 @@ class BaseDebugger(ABC):
the event loop has checked for I/O events
:param timeout: The actual amount of seconds
that the loop has hung when using the select()
system call
that the loop has hung when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_cancel(self, task: Task):
@ -176,11 +176,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_cancel(self, task: Task) -> object:
@ -189,11 +189,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exception_raised(self, task: Task, exc: BaseException):
@ -202,10 +202,10 @@ class BaseDebugger(ABC):
has raised an exception
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param exc: The exception that was raised
:type exc: BaseException
"""
raise NotImplementedError
return NotImplemented

31
tests/memory_channel.py Normal file
View File

@ -0,0 +1,31 @@
import giambio
from debugger import Debugger
async def sender(c: giambio.MemoryChannel, n: int):
for i in range(n):
await c.write(str(i))
print(f"Sent {i}")
await c.close()
print("Sender done")
async def receiver(c: giambio.MemoryChannel):
while True:
if not await c.pending() and c.closed:
print("Receiver done")
break
item = await c.read()
print(f"Received {item}")
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel, n: int):
async with giambio.create_pool() as pool:
await pool.spawn(sender, channel, n)
await pool.spawn(receiver, channel)
channel = giambio.MemoryChannel(2)
giambio.run(main, channel, 5, debugger=())

56
tests/proxy.py Normal file
View File

@ -0,0 +1,56 @@
from debugger import Debugger
import giambio
import socket
async def proxy_one_way(source: giambio.socket.AsyncSocket, sink: giambio.socket.AsyncSocket):
"""
Sends data from source to sink
"""
sink_addr = ":".join(map(str, await sink.getpeername()))
source_addr = ":".join(map(str, await source.getpeername()))
while True:
data = await source.receive(1024)
if not data:
print(f"{source_addr} has exited, closing connection to {sink_addr}")
await sink.shutdown(socket.SHUT_WR)
break
print(f"Got {data.decode('utf8', errors='ignore')!r} from {source_addr}, forwarding it to {sink_addr}")
await sink.send_all(data)
async def proxy_two_way(a: giambio.socket.AsyncSocket, b: giambio.socket.AsyncSocket):
"""
Sets up a two-way proxy from a to b and from b to a
"""
async with giambio.create_pool() as pool:
await pool.spawn(proxy_one_way, a, b)
await pool.spawn(proxy_one_way, b, a)
async def main(delay: int, a: tuple, b: tuple):
"""
Sets up the proxy
"""
start = giambio.clock()
print(f"Starting two-way proxy from {a[0]}:{a[1]} to {b[0]}:{b[1]}, lasting for {delay} seconds")
async with giambio.skip_after(delay) as p:
sock_a = giambio.socket.socket()
sock_b = giambio.socket.socket()
await sock_a.connect(a)
await sock_b.connect(b)
async with sock_a, sock_b:
await proxy_two_way(sock_a, sock_b)
print(f"Proxy has exited after {giambio.clock() - start:.2f} seconds")
try:
giambio.run(main, 60, ("localhost", 12345), ("localhost", 54321), debugger=())
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
print("Ctrl+C detected, exiting")
else:
print(f"Exiting due to a {type(error).__name__}: {error}")

View File

@ -27,5 +27,5 @@ async def main(q: giambio.Queue, n: int):
queue = giambio.Queue(2)
queue = giambio.Queue()
giambio.run(main, queue, 5, debugger=())

33
tests/task_ipc.py Normal file
View File

@ -0,0 +1,33 @@
import random
import string
import giambio
from debugger import Debugger
async def task(c: giambio.MemoryChannel, name: str):
while True:
if await c.pending():
print(f"[{name}] Received {await c.read()!r}")
else:
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
print(f"[{name}] Sending {data!r}")
await c.write(data)
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel, delay: int):
print(f"[main] Spawning workers, exiting in {delay} seconds")
async with giambio.skip_after(delay) as pool:
await pool.spawn(task, channel, "one")
await pool.spawn(task, channel, "two")
await pool.spawn(task, channel, "three")
await channel.close()
print(f"[main] Operation complete, channel closed")
if await channel.pending():
print(f"[main] Channel has {len(channel.buffer)} leftover packet{'s' if len(channel.buffer) > 1 else ''} of data, clearing it")
while await channel.pending():
print(f"[main] Cleared {await channel.read()!r}")
channel = giambio.MemoryChannel()
giambio.run(main, channel, 6, debugger=())

32
tests/task_ipc2.py Normal file
View File

@ -0,0 +1,32 @@
import random
import string
import giambio
from debugger import Debugger
async def task(c: giambio.NetworkChannel, name: str):
while True:
if await c.pending():
print(f"[{name}] Received {(await c.read(8)).decode()!r}")
else:
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
print(f"[{name}] Sending {data!r}")
await c.write(data.encode())
await giambio.sleep(1)
async def main(channel: giambio.NetworkChannel, delay: int):
print(f"[main] Spawning workers, exiting in {delay} seconds")
async with giambio.skip_after(delay) as pool:
await pool.spawn(task, channel, "one")
await pool.spawn(task, channel, "two")
await channel.close()
print(f"[main] Operation complete, channel closed")
if await channel.pending():
print(f"[main] Channel has leftover data, clearing it")
while await channel.pending():
print(f"[main] Cleared {await channel.read(1)!r}")
channel = giambio.NetworkChannel()
giambio.run(main, channel, 4, debugger=())