Merge remote-tracking branch 'origin/master'

# Conflicts:
#	giambio/context.py
#	giambio/core.py
#	giambio/runtime.py
#	giambio/sync.py
#	tests/queue.py
#	tests/timeout.py
This commit is contained in:
Mattia Giambirtone 2022-05-14 11:16:59 +02:00
commit 07b6621796
17 changed files with 632 additions and 135 deletions

View File

@ -22,7 +22,7 @@ __version__ = (0, 0, 1)
from giambio import exceptions, socket, context, core, task, io
from giambio.traps import sleep, current_task
from giambio.sync import Event, Queue
from giambio.sync import Event, Queue, Channel, MemoryChannel, NetworkChannel, Lock
from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from giambio.util import debug
@ -34,6 +34,10 @@ __all__ = [
"sleep",
"Event",
"Queue",
"Channel",
"NetworkChannel",
"MemoryChannel",
"Lock",
"run",
"clock",
"create_pool",

View File

@ -16,8 +16,10 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from lib2to3.pgen2.token import OP
import types
import giambio
from typing import List, Optional, Callable, Coroutine, Any
from typing import List, Optional
class TaskManager:
@ -53,8 +55,9 @@ class TaskManager:
self._proper_init = False
self.enclosed_pool: Optional["giambio.context.TaskManager"] = None
self.raise_on_timeout: bool = raise_on_timeout
self.entry_point: Optional[Task] = None
async def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> "giambio.task.Task":
async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task":
"""
Spawns a child task
"""
@ -69,6 +72,7 @@ class TaskManager:
"""
self._proper_init = True
self.entry_point = await giambio.traps.current_task()
return self
async def __aexit__(self, exc_type: Exception, exc: Exception, tb):
@ -77,16 +81,20 @@ class TaskManager:
all the tasks spawned inside the pool
"""
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
return True
try:
for task in self.tasks:
# This forces the interpreter to stop at the
# end of the block and wait for all
# children to exit
await task.join()
self.tasks.remove(task)
self._proper_init = False
if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout:
return True
except giambio.exceptions.TooSlowError:
if self.raise_on_timeout:
raise
async def cancel(self):
"""
Cancels the pool entirely, iterating over all
@ -104,4 +112,4 @@ class TaskManager:
pool have exited, False otherwise
"""
return self._proper_init and all([task.done() for task in self.tasks])
return self._proper_init and all([task.done() for task in self.tasks]) and (True if not self.enclosed_pool else self.enclosed_pool.done())

View File

@ -17,12 +17,13 @@ limitations under the License.
"""
# Import libraries and internal resources
import types
from giambio.task import Task
from collections import deque
from functools import partial
from timeit import default_timer
from giambio.context import TaskManager
from typing import Callable, List, Optional, Any, Dict, Coroutine
from typing import Callable, List, Optional, Any, Dict
from giambio.util.debug import BaseDebugger
from giambio.internal import TimeQueue, DeadlinesQueue
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -55,7 +56,7 @@ class AsyncScheduler:
:param clock: A callable returning monotonically increasing values at each call,
usually using seconds as units, but this is not enforced, defaults to timeit.default_timer
:type clock: :class: Callable
:type clock: :class: types.FunctionType
:param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output
is desired, defaults to None
:type debugger: :class: giambio.util.BaseDebugger
@ -72,7 +73,7 @@ class AsyncScheduler:
def __init__(
self,
clock: Callable = default_timer,
clock: types.FunctionType = default_timer,
debugger: Optional[BaseDebugger] = None,
selector: Optional[Any] = None,
io_skip_limit: Optional[int] = None,
@ -94,7 +95,7 @@ class AsyncScheduler:
or type(
"DumbDebugger",
(object,),
{"__getattr__": lambda *args: lambda *arg: None},
{"__getattr__": lambda *_: lambda *_: None},
)()
)
# All tasks the loop has
@ -106,7 +107,7 @@ class AsyncScheduler:
# This will always point to the currently running coroutine (Task object)
self.current_task: Optional[Task] = None
# Monotonic clock to keep track of elapsed time reliably
self.clock: Callable = clock
self.clock: types.FunctionType = clock
# Tasks that are asleep
self.paused: TimeQueue = TimeQueue(self.clock)
# Have we ever ran?
@ -129,6 +130,7 @@ class AsyncScheduler:
self.entry_point: Optional[Task] = None
# Suspended tasks
self.suspended: deque = deque()
def __repr__(self):
"""
@ -150,6 +152,8 @@ class AsyncScheduler:
"_data",
"io_skip_limit",
"io_max_timeout",
"suspended",
"entry_point"
}
data = ", ".join(
name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields))
@ -206,7 +210,10 @@ class AsyncScheduler:
# after it is set, but it makes the implementation easier
if not self.current_pool and self.current_task.pool:
self.current_pool = self.current_task.pool
self.deadlines.put(self.current_pool)
pool = self.current_pool
while pool:
self.deadlines.put(pool)
pool = self.current_pool.enclosed_pool
# If there are no actively running tasks, we start by
# checking for I/O. This method will wait for I/O until
# the closest deadline to avoid starving sleeping tasks
@ -245,7 +252,8 @@ class AsyncScheduler:
self.current_task.exc = err
self.join(self.current_task)
def create_task(self, corofunc: Callable[..., Coroutine[Any, Any, Any]], pool, *args, **kwargs) -> Task:
def create_task(self, corofunc: types.FunctionType, pool, *args, **kwargs) -> Task:
"""
Creates a task from a coroutine function and schedules it
to run. The associated pool that spawned said task is also
@ -290,9 +298,12 @@ class AsyncScheduler:
# We need to make sure we don't try to execute
# exited tasks that are on the running queue
return
if not self.current_pool and self.current_task.pool:
if not self.current_pool:
self.current_pool = self.current_task.pool
self.deadlines.put(self.current_pool)
pool = self.current_pool
while pool:
self.deadlines.put(pool)
pool = self.current_pool.enclosed_pool
self.debugger.before_task_step(self.current_task)
# Some debugging and internal chatter here
self.current_task.status = "run"
@ -336,7 +347,7 @@ class AsyncScheduler:
if self.selector.get_map():
for k in filter(
lambda o: o.data == self.current_task,
lambda o: o.data == task,
dict(self.selector.get_map()).values(),
):
self.io_release(k.fileobj)
@ -348,13 +359,14 @@ class AsyncScheduler:
a do-nothing method, since it will not reschedule the task
before returning. The task will stay suspended as long as
something else outside the loop calls a trap to reschedule it.
Any pending I/O for the task is temporarily unscheduled to
Any pending I/O for the task is temporarily unscheduled to
avoid some previous network operation to reschedule the task
before it's due
"""
if self.current_task.last_io:
if self.current_task.last_io or self.current_task.status == "io":
self.io_release_task(self.current_task)
self.current_task.status = "sleep"
self.suspended.append(self.current_task)
def reschedule_running(self):
@ -434,16 +446,12 @@ class AsyncScheduler:
while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock():
pool = self.deadlines.get()
pool.timed_out = True
if self.current_task is self.entry_point:
self.paused.discard(self.current_task)
self.io_release_task(self.current_task)
self.reschedule_running()
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
self.cancel_pool(pool)
for task in pool.tasks:
if not task.done():
self.paused.discard(task)
self.io_release_task(task)
self.handle_task_exit(task, partial(task.throw, TooSlowError(task)))
self.join(task)
if pool.entry_point is self.entry_point:
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
self.run_ready.append(self.entry_point)
def schedule_tasks(self, tasks: List[Task]):
"""
@ -477,6 +485,7 @@ class AsyncScheduler:
self.run_ready.append(task)
self.debugger.after_sleep(task, slept)
def get_closest_deadline(self) -> float:
"""
Gets the closest expiration deadline (asleep tasks, timeouts)
@ -537,7 +546,7 @@ class AsyncScheduler:
self.run_ready.append(key.data) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True):
def start(self, func: types.FunctionType, *args, loop: bool = True):
"""
Starts the event loop from a sync context. If the loop parameter
is false, the event loop will not start listening for events
@ -550,9 +559,12 @@ class AsyncScheduler:
self.run_ready.append(entry)
self.debugger.on_start()
if loop:
self.run()
self.has_ran = True
self.debugger.on_exit()
try:
self.run()
finally:
self.has_ran = True
self.close()
self.debugger.on_exit()
def cancel_pool(self, pool: TaskManager) -> bool:
"""
@ -613,6 +625,16 @@ class AsyncScheduler:
elif not self.done():
raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit")
self.shutdown()
# We reset the event loop's state
self.tasks = []
self.entry_point = None
self.current_pool = None
self.current_task = None
self.paused = TimeQueue(self.clock)
self.deadlines = DeadlinesQueue()
self.run_ready = deque()
self.suspended = deque()
def reschedule_joiners(self, task: Task):
"""
@ -620,21 +642,16 @@ class AsyncScheduler:
given task, if any
"""
if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done():
return
for t in task.joiners:
self.run_ready.append(t)
# noinspection PyMethodMayBeStatic
def is_pool_done(self, pool: Optional[TaskManager]):
"""
Returns True if a given pool has finished
executing
"""
while pool:
if not pool.done():
return False
pool = pool.enclosed_pool
return True
if t not in self.run_ready:
# Since a task can be the parent
# of multiple children, we need to
# make sure we reschedule it only
# once, otherwise a RuntimeError will
# occur
self.run_ready.append(t)
def join(self, task: Task):
"""
@ -645,24 +662,21 @@ class AsyncScheduler:
task.joined = True
if task.finished or task.cancelled:
if task in self.tasks:
self.tasks.remove(task)
if not task.cancelled:
# This way join() returns the
# task's return value
for joiner in task.joiners:
self._data[joiner] = task.result
self.debugger.on_task_exit(task)
if task.last_io:
self.io_release_task(task)
if task in self.suspended:
self.suspended.remove(task)
# If the pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s)
if self.is_pool_done(task.pool):
if task.pool is None:
return
if task.pool.done():
self.reschedule_joiners(task)
self.reschedule_running()
elif task.exc:
if task in self.tasks:
self.tasks.remove(task)
if task in self.suspended:
self.suspended.remove(task)
task.status = "crashed"
if task.exc.__traceback__:
# TODO: We might want to do a bit more complex traceback hacking to remove any extra
@ -683,11 +697,15 @@ class AsyncScheduler:
# or just returned
for t in task.joiners.copy():
# Propagate the exception
self.handle_task_exit(t, partial(t.throw, task.exc))
if t.exc or t.finished or t.cancelled:
try:
t.throw(task.exc)
except (StopIteration, CancelledError, RuntimeError):
# TODO: Need anything else?
task.joiners.remove(t)
finally:
if t in self.tasks:
self.tasks.remove(t)
self.reschedule_joiners(task)
self.reschedule_running()
def sleep(self, seconds: int or float):
"""
@ -729,6 +747,8 @@ class AsyncScheduler:
self.io_release_task(task)
elif task.status == "sleep":
self.paused.discard(task)
if task in self.suspended:
self.suspended.remove(task)
try:
self.do_cancel(task)
except CancelledError as cancel:
@ -745,7 +765,6 @@ class AsyncScheduler:
task.cancel_pending = False
task.cancelled = True
task.status = "cancelled"
self.io_release_task(self.current_task)
self.debugger.after_cancel(task)
self.tasks.remove(task)
else:
@ -756,12 +775,12 @@ class AsyncScheduler:
def register_sock(self, sock, evt_type: str):
"""
Registers the given socket inside the
selector to perform I/0 multiplexing
selector to perform I/O multiplexing
:param sock: The socket on which a read or write operation
has to be performed
has to be performed
:param evt_type: The type of event to perform on the given
socket, either "read" or "write"
socket, either "read" or "write"
:type evt_type: str
"""
@ -795,5 +814,8 @@ class AsyncScheduler:
try:
self.selector.register(sock, evt, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy("The given socket is being read/written by another task") from None
# The socket is already registered doing something else, we
# modify the socket instead (or maybe not?)
self.selector.modify(sock, evt, self.current_task)
# TODO: Does this break stuff?
# raise ResourceBusy("The given socket is being read/written by another task") from None

View File

@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import giambio
from giambio.exceptions import ResourceClosed
from giambio.traps import want_write, want_read, io_release
@ -121,6 +122,7 @@ class AsyncSocket:
if self.sock:
self.sock.shutdown(how)
await giambio.sleep(0) # Checkpoint
async def bind(self, addr: tuple):
"""

View File

@ -18,13 +18,12 @@ limitations under the License.
import inspect
import threading
from typing import Callable, Coroutine, Any, Union
from giambio.core import AsyncScheduler
from giambio.exceptions import GiambioError
from giambio.context import TaskManager
from timeit import default_timer
from giambio.util.debug import BaseDebugger
from types import FunctionType
thread_local = threading.local()
@ -42,7 +41,7 @@ def get_event_loop():
raise GiambioError("giambio is not running") from None
def new_event_loop(debugger: BaseDebugger, clock: Callable):
def new_event_loop(debugger: BaseDebugger, clock: FunctionType):
"""
Associates a new event loop to the current thread
and deactivates the old one. This should not be
@ -63,7 +62,7 @@ def new_event_loop(debugger: BaseDebugger, clock: Callable):
thread_local.loop = AsyncScheduler(clock, debugger)
def run(func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs):
def run(func: FunctionType, *args, **kwargs):
"""
Starts the event loop from a synchronous entry point
"""
@ -96,16 +95,23 @@ def create_pool():
return TaskManager()
def with_timeout(timeout: Union[int, float]):
def with_timeout(timeout: int or float):
"""
Creates an async pool with an associated timeout
"""
assert timeout > 0, "The timeout must be greater than 0"
return TaskManager(timeout)
loop = get_event_loop()
mgr = TaskManager(timeout)
if loop.current_task is not loop.entry_point:
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
return mgr
def skip_after(timeout: Union[int, float]):
def skip_after(timeout: int or float):
"""
Creates an async pool with an associated timeout, but
without raising a TooSlowError exception. The pool
@ -113,4 +119,11 @@ def skip_after(timeout: Union[int, float]):
"""
assert timeout > 0, "The timeout must be greater than 0"
return TaskManager(timeout, False)
loop = get_event_loop()
mgr = TaskManager(timeout, False)
if loop.current_task is not loop.entry_point:
mgr.tasks.append(loop.current_task)
if loop.current_pool and loop.current_pool is not mgr:
loop.current_pool.enclosed_pool = mgr
return mgr

View File

@ -15,10 +15,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from socket import socketpair
from abc import ABC, abstractmethod
from collections import deque
from typing import Any, Optional
from giambio.traps import event_wait, event_set, current_task, suspend, schedule_tasks, current_loop
from giambio.traps import event_wait, event_set, current_task
from giambio.exceptions import GiambioError
from giambio.socket import wrap_socket
from giambio.task import Task
class Event:
@ -45,7 +49,7 @@ class Event:
if self.set:
raise GiambioError("The event has already been set")
self.value = value
await event_set(self)
await event_set(self, value)
async def wait(self):
"""
@ -69,14 +73,43 @@ class Queue:
"""
self.maxsize = maxsize
# Stores event objects for tasks wanting to
# get items from the queue
self.getters = deque()
# Stores event objects for tasks wanting to
# put items on the queue
self.putters = deque()
self.container = deque(maxlen=maxsize)
self.container = deque()
def __len__(self):
"""
Returns the length of the queue
"""
return len(self.container)
def __repr__(self) -> str:
return f"{type(self).__name__}({f', '.join(map(str, self.container))})"
async def __aiter__(self):
"""
Implements the asynchronous iterator protocol
"""
return self
async def __anext__(self):
"""
Implements the asynchronous iterator protocol
"""
return await self.get()
async def put(self, item: Any):
"""
Pushes an element onto the queue. If the
queue is full, waits until there's
queue is full, waits until there's
enough space for the queue
"""
@ -85,8 +118,10 @@ class Queue:
if self.getters:
await self.getters.popleft().trigger(self.container.popleft())
else:
self.putters.append(Event())
await self.putters[-1].wait()
ev = Event()
self.putters.append(ev)
await ev.wait()
self.container.append(item)
async def get(self) -> Any:
"""
@ -100,5 +135,254 @@ class Queue:
await self.putters.popleft().trigger()
return self.container.popleft()
else:
self.getters.append(Event())
return await self.getters[-1].wait()
ev = Event()
self.getters.append(ev)
return await ev.wait()
async def clear(self):
"""
Clears the queue
"""
self.container.clear()
async def reset(self):
"""
Resets the queue
"""
await self.clear()
self.getters.clear()
self.putters.clear()
class Channel(ABC):
"""
A generic, two-way, full-duplex communication channel between
tasks. This is just an abstract base class!
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
self.maxsize = maxsize
self.closed = False
@abstractmethod
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
return NotImplemented
@abstractmethod
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return NotImplemented
@abstractmethod
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for other tasks to read
"""
return NotImplemented
@abstractmethod
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return NotImplemented
class MemoryChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's queueing mechanism. Operations on this
object do not perform any I/O or other system call and
are therefore extremely efficient
"""
def __init__(self, maxsize: Optional[int] = None):
"""
Public object constructor
"""
super().__init__(maxsize)
# We use a queue as our buffer
self.buffer = Queue(maxsize=maxsize)
async def write(self, data: str):
"""
Writes data to the channel. Blocks if the internal
queue is full until a spot is available. Does nothing
if the channel has been closed
"""
if self.closed:
return
await self.buffer.put(data)
async def read(self):
"""
Reads data from the channel. Blocks until
a message arrives or returns immediately if
one is already waiting
"""
return await self.buffer.get()
async def close(self):
"""
Closes the memory channel. Any underlying
data is left for other tasks to read
"""
self.closed = True
async def pending(self):
"""
Returns if there's pending
data to be read
"""
return bool(len(self.buffer))
class NetworkChannel(Channel):
"""
A two-way communication channel between tasks based
on giambio's I/O mechanisms instead of in-memory queues
"""
def __init__(self):
"""
Public object constructor
"""
super().__init__(None)
# We use a socket as our buffer instead of a queue
sockets = socketpair()
self.reader = wrap_socket(sockets[0])
self.writer = wrap_socket(sockets[1])
self._internal_buffer = b""
async def write(self, data: bytes):
"""
Writes data to the channel. Blocks if the internal
socket is not currently available. Does nothing
if the channel has been closed
"""
if self.closed:
return
await self.writer.send_all(data)
async def read(self, size: int):
"""
Reads exactly size bytes from the channel. Blocks until
enough data arrives. Extra data is cached and used on the
next read
"""
data = self._internal_buffer
while len(data) < size:
data += await self.reader.receive(size)
self._internal_buffer = data[size:]
data = data[:size]
return data
async def close(self):
"""
Closes the memory channel. Any underlying
data is flushed out of the internal socket
and is lost
"""
self.closed = True
await self.reader.close()
await self.writer.close()
async def pending(self):
"""
Returns if there's pending
data to be read
"""
# TODO: Ugly!
if self.closed:
return False
try:
self._internal_buffer += self.reader.sock.recv(1)
except BlockingIOError:
return False
return True
class Lock:
"""
A simple single-owner lock
"""
def __init__(self):
"""
Public constructor
"""
self.owner: Optional[Task] = None
self.tasks: deque[Event] = deque()
async def acquire(self):
"""
Acquires the lock
"""
task = await current_task()
if self.owner is None:
self.owner = task
elif task is self.owner:
raise RuntimeError("lock is already acquired by current task")
elif self.owner is not task:
self.tasks.append(Event())
await self.tasks[-1].wait()
self.owner = task
async def release(self):
"""
Releases the lock
"""
task = await current_task()
if self.owner is None:
raise RuntimeError("lock is not acquired")
elif self.owner is not task:
raise RuntimeError("lock can only released by its owner")
elif self.tasks:
await self.tasks.popleft().trigger()
else:
self.owner = None
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
await self.release()

View File

@ -149,7 +149,7 @@ async def cancel(task):
"""
await create_trap("cancel", task)
assert task.cancelled, f"Task ignored CancelledError"
assert task.done(), f"Task ignored CancelledError"
async def want_read(stream):
@ -194,17 +194,18 @@ async def event_wait(event):
return
task = await current_task()
event.waiters.add(task)
return await create_trap("suspend")
return await suspend()
async def event_set(event):
async def event_set(event, value):
"""
Sets the given event and reawakens its
waiters
"""
event.set = True
event.value = value
loop = await current_loop()
for waiter in event.waiters:
loop._data[waiter] = event.value
await schedule_tasks(event.waiters)
await schedule_tasks(event.waiters)

View File

@ -31,7 +31,7 @@ class BaseDebugger(ABC):
loop starts executing
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exit(self):
@ -40,7 +40,7 @@ class BaseDebugger(ABC):
loop exits entirely (all tasks completed)
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_schedule(self, task: Task, delay: float):
@ -49,14 +49,14 @@ class BaseDebugger(ABC):
scheduled (not spawned)
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param delay: The delay, in seconds, after which
the task will start executing
the task will start executing
:type delay: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_spawn(self, task: Task):
@ -65,11 +65,11 @@ class BaseDebugger(ABC):
spawned
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_task_exit(self, task: Task):
@ -77,11 +77,11 @@ class BaseDebugger(ABC):
This method is called when a task exits
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_task_step(self, task: Task):
@ -90,11 +90,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_task_step(self, task: Task):
@ -103,11 +103,11 @@ class BaseDebugger(ABC):
calling a task's run() method
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_sleep(self, task: Task, seconds: float):
@ -116,14 +116,14 @@ class BaseDebugger(ABC):
to sleep
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task wants to sleep
task wants to sleep
:type seconds: int
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_sleep(self, task: Task, seconds: float):
@ -132,14 +132,14 @@ class BaseDebugger(ABC):
awakes from sleeping
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param seconds: The amount of seconds the
task actually slept
task actually slept
:type seconds: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_io(self, timeout: float):
@ -148,12 +148,12 @@ class BaseDebugger(ABC):
the event loop checks for I/O events
:param timeout: The max. amount of seconds
that the loop will hang when using the select()
system call
that the loop will hang when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_io(self, timeout: float):
@ -162,12 +162,12 @@ class BaseDebugger(ABC):
the event loop has checked for I/O events
:param timeout: The actual amount of seconds
that the loop has hung when using the select()
system call
that the loop has hung when using the select()
system call
:type timeout: float
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def before_cancel(self, task: Task):
@ -176,11 +176,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def after_cancel(self, task: Task) -> object:
@ -189,11 +189,11 @@ class BaseDebugger(ABC):
gets cancelled
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
"""
raise NotImplementedError
return NotImplemented
@abstractmethod
def on_exception_raised(self, task: Task, exc: BaseException):
@ -202,10 +202,10 @@ class BaseDebugger(ABC):
has raised an exception
:param task: The Task object representing a
giambio Task and wrapping a coroutine
giambio Task and wrapping a coroutine
:type task: :class: giambio.objects.Task
:param exc: The exception that was raised
:type exc: BaseException
"""
raise NotImplementedError
return NotImplemented

View File

@ -5,7 +5,7 @@ with open("README.md", "r") as readme:
setuptools.setup(
name="GiambIO",
version="1.0.1",
version="0.5.0",
author="Nocturn9x",
author_email="hackhab@gmail.com",
description="Asynchronous Python made easy (and friendly)",

View File

@ -8,6 +8,7 @@ async def sender(sock: giambio.socket.AsyncSocket, q: giambio.Queue):
while True:
await sock.send_all(b"yo")
await q.put((0, ""))
await giambio.sleep(1)
async def receiver(sock: giambio.socket.AsyncSocket, q: giambio.Queue):

31
tests/memory_channel.py Normal file
View File

@ -0,0 +1,31 @@
import giambio
from debugger import Debugger
async def sender(c: giambio.MemoryChannel, n: int):
for i in range(n):
await c.write(str(i))
print(f"Sent {i}")
await c.close()
print("Sender done")
async def receiver(c: giambio.MemoryChannel):
while True:
if not await c.pending() and c.closed:
print("Receiver done")
break
item = await c.read()
print(f"Received {item}")
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel, n: int):
async with giambio.create_pool() as pool:
await pool.spawn(sender, channel, n)
await pool.spawn(receiver, channel)
channel = giambio.MemoryChannel(2)
giambio.run(main, channel, 5, debugger=())

56
tests/proxy.py Normal file
View File

@ -0,0 +1,56 @@
from debugger import Debugger
import giambio
import socket
async def proxy_one_way(source: giambio.socket.AsyncSocket, sink: giambio.socket.AsyncSocket):
"""
Sends data from source to sink
"""
sink_addr = ":".join(map(str, await sink.getpeername()))
source_addr = ":".join(map(str, await source.getpeername()))
while True:
data = await source.receive(1024)
if not data:
print(f"{source_addr} has exited, closing connection to {sink_addr}")
await sink.shutdown(socket.SHUT_WR)
break
print(f"Got {data.decode('utf8', errors='ignore')!r} from {source_addr}, forwarding it to {sink_addr}")
await sink.send_all(data)
async def proxy_two_way(a: giambio.socket.AsyncSocket, b: giambio.socket.AsyncSocket):
"""
Sets up a two-way proxy from a to b and from b to a
"""
async with giambio.create_pool() as pool:
await pool.spawn(proxy_one_way, a, b)
await pool.spawn(proxy_one_way, b, a)
async def main(delay: int, a: tuple, b: tuple):
"""
Sets up the proxy
"""
start = giambio.clock()
print(f"Starting two-way proxy from {a[0]}:{a[1]} to {b[0]}:{b[1]}, lasting for {delay} seconds")
async with giambio.skip_after(delay) as p:
sock_a = giambio.socket.socket()
sock_b = giambio.socket.socket()
await sock_a.connect(a)
await sock_b.connect(b)
async with sock_a, sock_b:
await proxy_two_way(sock_a, sock_b)
print(f"Proxy has exited after {giambio.clock() - start:.2f} seconds")
try:
giambio.run(main, 60, ("localhost", 12345), ("localhost", 54321), debugger=())
except (Exception, KeyboardInterrupt) as error: # Exceptions propagate!
if isinstance(error, KeyboardInterrupt):
print("Ctrl+C detected, exiting")
else:
print(f"Exiting due to a {type(error).__name__}: {error}")

View File

@ -4,6 +4,9 @@ from debugger import Debugger
async def producer(q: giambio.Queue, n: int):
for i in range(n):
# This will wait until the
# queue is emptied by the
# consumer
await q.put(i)
print(f"Produced {i}")
await q.put(None)
@ -12,19 +15,25 @@ async def producer(q: giambio.Queue, n: int):
async def consumer(q: giambio.Queue):
while True:
# Hangs until there is
# something on the queue
item = await q.get()
if item is None:
print("Consumer done")
break
print(f"Consumed {item}")
# Simulates some work so the
# producer waits before putting
# the next value
await giambio.sleep(1)
async def main(q: giambio.Queue, n: int):
async with giambio.create_pool() as pool:
await pool.spawn(consumer, q)
await pool.spawn(producer, q, n)
await pool.spawn(consumer, q)
print("Bye!")
queue = giambio.Queue()
queue = giambio.Queue(1) # Queue has size limit of 1
giambio.run(main, queue, 5, debugger=())

34
tests/task_ipc.py Normal file
View File

@ -0,0 +1,34 @@
## SImple task IPC using giambio's MemoryChannel class
import random
import string
import giambio
from debugger import Debugger
async def task(c: giambio.MemoryChannel, name: str):
while True:
if await c.pending():
print(f"[{name}] Received {await c.read()!r}")
else:
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
print(f"[{name}] Sending {data!r}")
await c.write(data)
await giambio.sleep(1)
async def main(channel: giambio.MemoryChannel, delay: int):
print(f"[main] Spawning workers, exiting in {delay} seconds")
async with giambio.skip_after(delay) as pool:
await pool.spawn(task, channel, "one")
await pool.spawn(task, channel, "two")
await pool.spawn(task, channel, "three")
await channel.close()
print(f"[main] Operation complete, channel closed")
if await channel.pending():
print(f"[main] Channel has {len(channel.buffer)} leftover packet{'s' if len(channel.buffer) > 1 else ''} of data, clearing it")
while await channel.pending():
print(f"[main] Cleared {await channel.read()!r}")
channel = giambio.MemoryChannel()
giambio.run(main, channel, 6, debugger=())

32
tests/task_ipc2.py Normal file
View File

@ -0,0 +1,32 @@
import random
import string
import giambio
from debugger import Debugger
async def task(c: giambio.NetworkChannel, name: str):
while True:
if await c.pending():
print(f"[{name}] Received {(await c.read(8)).decode()!r}")
else:
data = "".join(random.choice(string.ascii_letters) for _ in range(8))
print(f"[{name}] Sending {data!r}")
await c.write(data.encode())
await giambio.sleep(1)
async def main(channel: giambio.NetworkChannel, delay: int):
print(f"[main] Spawning workers, exiting in {delay} seconds")
async with giambio.skip_after(delay) as pool:
await pool.spawn(task, channel, "one")
await pool.spawn(task, channel, "two")
await channel.close()
print(f"[main] Operation complete, channel closed")
if await channel.pending():
print(f"[main] Channel has leftover data, clearing it")
while await channel.pending():
print(f"[main] Cleared {await channel.read(1)!r}")
channel = giambio.NetworkChannel()
giambio.run(main, channel, 4, debugger=())

View File

@ -12,11 +12,11 @@ async def main():
start = giambio.clock()
try:
async with giambio.with_timeout(12) as pool:
await pool.spawn(child, 7) # This will complete
await giambio.sleep(2) # This will make the code below wait 2 seconds
await pool.spawn(child, 7) # This will complete
await giambio.sleep(2) # This will make the code below wait 2 seconds
await pool.spawn(child, 15) # This will not complete
await giambio.sleep(50)
await child(20) # Neither will this
await child(20) # Neither will this
await giambio.sleep(50) # Nor this
except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")

View File

@ -11,11 +11,11 @@ async def child(name: int):
async def main():
start = giambio.clock()
async with giambio.skip_after(10) as pool:
await pool.spawn(child, 7) # This will complete
await giambio.sleep(2) # This will make the code below wait 2 seconds
await pool.spawn(child, 7) # This will complete
await giambio.sleep(2) # This will make the code below wait 2 seconds
await pool.spawn(child, 15) # This will not complete
await giambio.sleep(50)
await child(20) # Neither will this
await giambio.sleep(50) # Neither will this
await child(20) # Nor this
if pool.timed_out:
print("[main] One or more children have timed out!")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")