Browse Source

Beautified and simplified some code + README additions

master
nocturn9x 1 year ago
parent
commit
44a07521b2
  1. 7
      README.md
  2. 2
      giambio/__init__.py
  3. 8
      giambio/context.py
  4. 122
      giambio/core.py
  5. 4
      giambio/exceptions.py
  6. 5
      giambio/internal.py
  7. 8
      giambio/io.py
  8. 3
      giambio/sync.py
  9. 10
      giambio/task.py
  10. 15
      giambio/traps.py
  11. 8
      tests/cancel.py
  12. 4
      tests/debugger.py
  13. 4
      tests/events.py
  14. 4
      tests/exceptions.py
  15. 4
      tests/nested_exception.py
  16. 4
      tests/nested_pool.py
  17. 12
      tests/server.py
  18. 4
      tests/sleep.py
  19. 4
      tests/timeout.py

7
README.md

@ -59,6 +59,13 @@ Giambio means to take the best of all of its predecessors, while being:
- Dependency-free: No fancy C modules, no external libraries, just pure idiomatic Python code
- Community-based: I frankly wouldn't have bothered making this if curio was open to community additions
Another problem I would like to address and that I've heard some developers rant about is the lack of control
that the `run()` paradigm causes: you can read a way better and more detailed explanation [here](https://gist.github.com/Justasic/b57bfd05dd8e7a108bc433c8c9a66e59).
Giambio fixes this problem by exposing all of its internal machinery to the public and also allowing
to not start listening for events automatically by doing `AsyncScheduler(...).start(..., loop=False)`, in which case
the responsibility of handling everything (including loop ticks) is transferred to the end user allowing for a much
more granular control of the loop according to one's needs.
## Current limitations
giambio is **highly** experimental and there's a lot to work to do before it's usable. Namely:

2
giambio/__init__.py

@ -20,7 +20,7 @@ __author__ = "Nocturn9x"
__version__ = (0, 0, 1)
from . import exceptions, socket, context, core
from . import exceptions, socket, context, core, task, io
from .traps import sleep, current_task
from .sync import Event
from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout

8
giambio/context.py

@ -34,15 +34,13 @@ class TaskManager:
Object constructor
"""
# The event loop associated with this pool
self.loop: giambio.core.AsyncScheduler = giambio.get_event_loop()
# All the tasks that belong to this pool
self.tasks: List[giambio.objects.Task] = []
self.tasks: List[giambio.task.Task] = []
# Whether we have been cancelled or not
self.cancelled: bool = False
# The clock time of when we started running, used for
# timeouts expiration
self.started: float = self.loop.clock()
self.started: float = giambio.clock()
# The pool's timeout (in seconds)
if timeout:
self.timeout: float = self.started + timeout
@ -57,7 +55,7 @@ class TaskManager:
Spawns a child task
"""
assert self._proper_init
assert self._proper_init, "Cannot use improperly initialized pool"
return await giambio.traps.create_task(func, *args)
async def __aenter__(self):

122
giambio/core.py

@ -18,15 +18,11 @@ limitations under the License.
# Import libraries and internal resources
import types
import socket
from itertools import chain
from giambio.task import Task
from giambio.sync import Event
from timeit import default_timer
from giambio.context import TaskManager
from typing import List, Optional, Set, Any
from typing import List, Optional, Any
from giambio.util.debug import BaseDebugger
from giambio.traps import want_read, want_write
from giambio.internal import TimeQueue, DeadlinesQueue
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from giambio.exceptions import (
@ -105,7 +101,7 @@ class AsyncScheduler:
# Tasks that are ready to run
self.run_ready: List[Task] = []
# Selector object to perform I/O multiplexing
self.selector: DefaultSelector = DefaultSelector()
self.selector = selector or DefaultSelector()
# This will always point to the currently running coroutine (Task object)
self.current_task: Optional[Task] = None
# Monotonic clock to keep track of elapsed time reliably
@ -134,11 +130,25 @@ class AsyncScheduler:
Returns repr(self)
"""
fields = {"debugger", "tasks", "run_ready", "selector", "current_task",
"clock", "paused", "has_ran", "current_pool", "io_skip",
"deadlines", "_data", "io_skip_limit", "io_max_timeout"
}
data = ", ".join(name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)))
fields = {
"debugger",
"tasks",
"run_ready",
"selector",
"current_task",
"clock",
"paused",
"has_ran",
"current_pool",
"io_skip",
"deadlines",
"_data",
"io_skip_limit",
"io_max_timeout",
}
data = ", ".join(
name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields))
)
return f"{type(self).__name__}({data})"
def done(self) -> bool:
@ -146,18 +156,16 @@ class AsyncScheduler:
Returns True if there is no work to do
"""
if any([self.paused, self.run_ready, self.selector.get_map()]):
return False
return True
return not any([self.paused, self.run_ready, self.selector.get_map()])
def shutdown(self):
"""
Shuts down the event loop
"""
for task in self.tasks:
self.io_release_task(task)
self.selector.close()
self.tasks = []
self.current_task = self.current_pool = None
# TODO: Anything else?
def run(self):
@ -198,11 +206,7 @@ class AsyncScheduler:
if self.paused:
# Next we try to (re)schedule the asleep tasks
self.awake_sleeping()
if (
self.current_pool
and self.current_pool.timeout
and not self.current_pool.timed_out
):
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific).
# The deadlines queue will internally make sure not to store
# a deadline for the same pool twice. This makes the timeouts
@ -232,14 +236,21 @@ class AsyncScheduler:
self.current_task.exc = err
self.join(self.current_task)
def create_task(self, coro, *args) -> Task:
def create_task(self, corofunc: types.FunctionType, *args, **kwargs) -> Task:
"""
Creates a task
Creates a task from a coroutine function and schedules it
to run. Any extra keyword or positional argument are then
passed to the function
:param corofunc: The coroutine function (not a coroutine!) to
spawn
:type corofunc: function
"""
task = Task(coro.__name__ or str(coro), coro(*args), self.current_pool)
task = Task(corofunc.__name__ or str(corofunc), corofunc(*args, **kwargs), self.current_pool)
task.next_deadline = self.current_pool.timeout or 0.0
task.joiners = {self.current_task}
self._data = task
self.tasks.append(task)
self.run_ready.append(task)
self.debugger.on_task_spawn(task)
@ -251,7 +262,7 @@ class AsyncScheduler:
"""
Runs a single step for the current task.
A step ends when the task awaits any of
giambio's primitives or async methods.
our primitives or async methods.
Note that this method does NOT catch any
exception arising from tasks, nor does it
@ -278,7 +289,7 @@ class AsyncScheduler:
# somewhere)
method, *args = self.current_task.run(data)
if data is self._data:
self._data = None
self._data = None
# Some debugging and internal chatter here
self.current_task.status = "run"
self.current_task.steps += 1
@ -290,8 +301,7 @@ class AsyncScheduler:
# compared to us. If you get this exception and you're 100% sure you're
# not mixing async primitives from other libraries, then it's a bug!
raise InternalError(
"Uh oh! Something very bad just happened, did"
" you try to mix primitives from other async libraries?"
"Uh oh! Something very bad just happened, did you try to mix primitives from other async libraries?"
) from None
# Sneaky method call, thanks to David Beazley for this ;)
getattr(self, method)(*args)
@ -320,12 +330,19 @@ class AsyncScheduler:
if self.selector.get_map() and sock in self.selector.get_map():
self.selector.unregister(sock)
def suspend(self):
def suspend(self, task: Task):
"""
Suspends execution of the current task
Suspends execution of the given task. This is basically
a do-nothing method, since it will not reschedule the task
before returning. The task will stay suspended as long as
something else outside the loop calls a trap to reschedule it.
This method will unregister any I/O as well to ensure the task
isn't rescheduled in further calls to select()
"""
... # TODO: Unschedule I/O?
if task.last_io:
self.io_release_task(task)
def reschedule_running(self):
"""
@ -334,6 +351,8 @@ class AsyncScheduler:
if self.current_task:
self.run_ready.append(self.current_task)
else:
raise GiambioError("giambio is not running")
def do_cancel(self, task: Task):
"""
@ -357,7 +376,6 @@ class AsyncScheduler:
self._data = self.current_task
self.reschedule_running()
def get_current_pool(self):
"""
'Returns' the current pool to an async caller
@ -366,7 +384,6 @@ class AsyncScheduler:
self._data = self.current_pool
self.reschedule_running()
def get_current_loop(self):
"""
'Returns' self to an async caller
@ -394,6 +411,7 @@ class AsyncScheduler:
"""
self.run_ready.extend(tasks)
self.reschedule_running()
def awake_sleeping(self):
"""
@ -470,7 +488,9 @@ class AsyncScheduler:
def start(self, func: types.FunctionType, *args, loop: bool = True):
"""
Starts the event loop from a sync context
Starts the event loop from a sync context. If the loop parameter
is false, the event loop will not start listening for events
automatically and the dispatching is on the users' shoulders
"""
entry = Task(func.__name__ or str(func), func(*args), None)
@ -502,7 +522,7 @@ class AsyncScheduler:
else: # If we're at the main task, we're sure everything else exited
return True
def get_all_tasks(self) -> chain:
def get_all_tasks(self) -> List[Task]:
"""
Returns a list of all the tasks the loop is currently
keeping track of: this includes both running and paused tasks.
@ -536,9 +556,7 @@ class AsyncScheduler:
if ensure_done:
self.cancel_all()
elif not self.done():
raise GiambioError(
"event loop not terminated, call this method with ensure_done=False to forcefully exit"
)
raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit")
self.shutdown()
def reschedule_joiners(self, task: Task):
@ -639,9 +657,7 @@ class AsyncScheduler:
# or dangling resource open after being cancelled, so maybe we need
# a different approach altogether
if task.status == "io":
for k in filter(
lambda o: o.data == task, dict(self.selector.get_map()).values()
):
for k in filter(lambda o: o.data == task, dict(self.selector.get_map()).values()):
self.selector.unregister(k.fileobj)
elif task.status == "sleep":
self.paused.discard(task)
@ -711,26 +727,4 @@ class AsyncScheduler:
self.selector.register(sock, evt, self.current_task)
except KeyError:
# The socket is already registered doing something else
raise ResourceBusy(
"The given socket is being read/written by another task"
) from None
# noinspection PyMethodMayBeStatic
async def connect_sock(self, sock: socket.socket, address_tuple: tuple):
"""
Connects a socket asynchronously to a given endpoint
:param sock: The socket that must to be connected
:type sock: socket.socket
:param address_tuple: A tuple in the same form as the one
passed to socket.socket.connect with an address as a string
and a port as an integer
:type address_tuple: tuple
"""
await want_write(sock)
try:
return sock.connect(address_tuple)
except BlockingIOError:
await want_write(sock)
return sock.connect(address_tuple)
raise ResourceBusy("The given socket is being read/written by another task") from None

4
giambio/exceptions.py

@ -95,7 +95,9 @@ class ErrorStack(GiambioError):
tracebacks = ""
for i, err in enumerate(self.errors):
if i not in (1, len(self.errors)):
tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n"
tracebacks += (
f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}\n{'-' * 32}\n"
)
else:
tracebacks += f"\n{''.join(traceback.format_exception(type(err), err, err.__traceback__))}"
return f"Multiple errors occurred:\n{tracebacks}"

5
giambio/internal.py

@ -15,8 +15,11 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import giambio
from typing import List, Tuple
from giambio.task import Task
from heapq import heappush, heappop
from heapq import heappush, heappop, heapify
class TimeQueue:

8
giambio/io.py

@ -169,7 +169,7 @@ class AsyncSocket:
Wrapper socket method
"""
raise RuntimeError('Use with_timeout() to set a timeout')
raise RuntimeError("Use with_timeout() to set a timeout")
def gettimeout(self):
"""
@ -205,15 +205,15 @@ class AsyncSocket:
try:
result = self.sock.connect(address)
if getattr(self, 'do_handshake_on_connect', False):
if getattr(self, "do_handshake_on_connect", False):
await self.do_handshake()
return result
except WantWrite:
await want_write(self.sock)
err = self.sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0:
raise OSError(err, f'Connect call failed {address}')
if getattr(self, 'do_handshake_on_connect', False):
raise OSError(err, f"Connect call failed {address}")
if getattr(self, "do_handshake_on_connect", False):
await self.do_handshake()
async def recvfrom(self, buffersize, flags=0):

3
giambio/sync.py

@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
from giambio.traps import event_wait, event_set
from giambio.exceptions import GiambioError
class Event:
@ -38,7 +39,7 @@ class Event:
"""
if self.set:
raise giambio.exceptions.GiambioError("The event has already been set")
raise GiambioError("The event has already been set")
await event_set(self)
async def wait(self):

10
giambio/task.py

@ -18,7 +18,7 @@ limitations under the License.
import giambio
from dataclasses import dataclass, field
from typing import Union, Coroutine, List, Tuple, Set
from typing import Union, Coroutine, Set
@dataclass
@ -104,8 +104,9 @@ class Task:
are propagated as well
"""
self.joiners.add(await giambio.traps.current_task())
print(self.joiners)
task = await giambio.traps.current_task()
if task:
self.joiners.add(task)
res = await giambio.traps.join(self)
if self.exc:
raise self.exc
@ -142,5 +143,4 @@ class Task:
self.coroutine.close()
except RuntimeError:
pass # TODO: This is kinda bad
assert not self.last_io
assert not self.last_io, f"task {self.name} was destroyed, but has pending I/O"

15
giambio/traps.py

@ -25,7 +25,7 @@ import types
import inspect
from giambio.task import Task
from types import FunctionType
from typing import List, Union, Iterable
from typing import Union, Iterable
from giambio.exceptions import GiambioError
@ -58,7 +58,7 @@ async def create_task(coro: FunctionType, *args):
elif inspect.iscoroutinefunction(coro):
return await create_trap("create_task", coro, *args)
else:
raise TypeError("coro must be a coroutine or coroutine function")
raise TypeError("coro must be a coroutine function")
async def sleep(seconds: Union[int, float]):
@ -122,7 +122,7 @@ async def join(task):
Awaits a given task for completion
:param task: The task to join
:type task: class: Task
:type task: :class: Task
"""
return await create_trap("join", task)
@ -187,7 +187,6 @@ async def event_set(event):
"""
event.set = True
await reschedule_running()
await schedule_tasks(event.waiters)
@ -197,11 +196,3 @@ async def schedule_tasks(tasks: Iterable[Task]):
"""
await create_trap("schedule_tasks", tasks)
async def reschedule_running():
"""
Reschedules the current task for execution
"""
await create_trap("reschedule_running")

8
tests/cancel.py

@ -11,15 +11,11 @@ async def child(name: int):
async def main():
start = giambio.clock()
async with giambio.create_pool() as pool:
await pool.spawn(
child, 1
) # If you comment this line, the pool will exit immediately!
await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
task = await pool.spawn(child, 2)
await task.cancel()
print("[main] Children spawned, awaiting completion")
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

4
tests/debugger.py

@ -13,9 +13,7 @@ class Debugger(giambio.debug.BaseDebugger):
print("## Finished running")
def on_task_schedule(self, task, delay: int):
print(
f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds"
)
print(f">> A task named '{task.name}' was scheduled to run in {delay:.2f} seconds")
def on_task_spawn(self, task):
print(f">> A task named '{task.name}' was spawned")

4
tests/events.py

@ -15,9 +15,7 @@ async def child(ev: giambio.Event, pause: int):
await giambio.sleep(pause)
end_sleep = giambio.clock() - start_sleep
end_total = giambio.clock() - start_total
print(
f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!"
)
print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!")
async def parent(pause: int = 1):

4
tests/exceptions.py

@ -26,9 +26,7 @@ async def main():
except Exception as error:
# Because exceptions just *work*!
print(f"[main] Exception from child caught! {repr(error)}")
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

4
tests/nested_exception.py

@ -43,9 +43,7 @@ async def main():
except Exception as error:
# Because exceptions just *work*!
print(f"[main] Exception from child caught! {repr(error)}")
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

4
tests/nested_pool.py

@ -19,9 +19,7 @@ async def main():
print("[main] Children spawned, awaiting completion")
# This will *only* execute when everything inside the async with block
# has ran, including any other pool
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

12
tests/server.py

@ -29,9 +29,7 @@ async def handler(sock: AsyncSocket, client_address: tuple):
"""
Handles a single client connection
:param sock: The giambio.socket.AsyncSocket object connected
to the client
:type sock: :class: giambio.socket.AsyncSocket
:param sock: The AsyncSocket object connected to the client
:param client_address: The client's address represented as a tuple
(address, port) where address is a string and port is an integer
:type client_address: tuple
@ -39,9 +37,7 @@ async def handler(sock: AsyncSocket, client_address: tuple):
address = f"{client_address[0]}:{client_address[1]}"
async with sock: # Closes the socket automatically
await sock.send_all(
b"Welcome to the server pal, feel free to send me something!\n"
)
await sock.send_all(b"Welcome to the server pal, feel free to send me something!\n")
while True:
await sock.send_all(b"-> ")
data = await sock.receive(1024)
@ -49,9 +45,7 @@ async def handler(sock: AsyncSocket, client_address: tuple):
break
elif data == b"exit\n":
await sock.send_all(b"I'm dead dude\n")
raise TypeError(
"Oh, no, I'm gonna die!"
) # This kills the entire application!
raise TypeError("Oh, no, I'm gonna die!") # This kills the entire application!
logging.info(f"Got: {data!r} from {address}")
await sock.send_all(b"Got: " + data)
logging.info(f"Echoed back {data!r} to {address}")

4
tests/sleep.py

@ -19,9 +19,7 @@ async def main():
await pool.spawn(child)
await pool.spawn(child1)
print("[main] Children spawned, awaiting completion")
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

4
tests/timeout.py

@ -16,9 +16,7 @@ async def main():
await child(20) # TODO: Broken
except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!")
print(
f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds"
)
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":

Loading…
Cancel
Save