diff --git a/giambio/__init__.py b/giambio/__init__.py index 3ef8670..0dc5923 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -21,10 +21,10 @@ __version__ = (0, 0, 1) from . import exceptions, socket, context, core, task, io -from .traps import sleep, current_task -from .sync import Event -from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after -from .util import debug +from giambio.traps import sleep, current_task +from giambio.sync import Event +from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after +from giambio.util import debug __all__ = [ diff --git a/giambio/core.py b/giambio/core.py index 6a74379..7023ce4 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -20,9 +20,10 @@ limitations under the License. import types from giambio.task import Task from collections import deque +from functools import partial from timeit import default_timer from giambio.context import TaskManager -from typing import List, Optional, Any, Dict +from typing import Callable, List, Optional, Any, Dict from giambio.util.debug import BaseDebugger from giambio.internal import TimeQueue, DeadlinesQueue from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE @@ -125,6 +126,7 @@ class AsyncScheduler: self.io_skip_limit = io_skip_limit or 5 # The max. I/O timeout self.io_max_timeout = io_max_timeout or 86400 + self.entry_point: Optional[Task] = None def __repr__(self): """ @@ -211,12 +213,7 @@ class AsyncScheduler: self.check_io() if self.deadlines: # Deadline expiration is our next step - try: - self.prune_deadlines() - except TooSlowError as t: - task = t.args[0] - task.exc = t - self.join(task) + self.prune_deadlines() if self.paused: # Next we try to (re)schedule the asleep tasks self.awake_sleeping() @@ -409,6 +406,26 @@ class AsyncScheduler: self._data[self.current_task] = self self.reschedule_running() + def handle_task_exit(self, task: Task, to_call: Callable): + """ + Convenience method for handling StopIteration + exceptions from tasks + """ + + try: + to_call() + except StopIteration as ret: + task.status = "end" + task.result = ret.value + task.finished = True + self.join(task) + self.tasks.remove(task) + except BaseException as err: + task.exc = err + self.join(task) + if task in self.tasks: + self.tasks.remove(task) + def prune_deadlines(self): """ Removes expired deadlines after their timeout @@ -417,14 +434,14 @@ class AsyncScheduler: while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): pool = self.deadlines.get() - if pool.done(): - continue pool.timed_out = True + if not pool.tasks and self.current_task is self.entry_point: + self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) for task in pool.tasks: if not task.done(): self.paused.discard(task) self.io_release_task(task) - task.throw(TooSlowError(task)) + self.handle_task_exit(task, partial(task.throw, TooSlowError(task))) def schedule_tasks(self, tasks: List[Task]): """ @@ -448,7 +465,6 @@ class AsyncScheduler: # expected if t.done() or t in self.run_ready: self.paused.discard(t) - print(t is self.current_task) while self.paused and self.paused.get_closest_deadline() <= self.clock(): # Reschedules tasks when their deadline has elapsed task = self.paused.get() @@ -525,6 +541,7 @@ class AsyncScheduler: entry = Task(func.__name__ or str(func), func(*args), None) self.tasks.append(entry) + self.entry_point = entry self.run_ready.append(entry) self.debugger.on_start() if loop: diff --git a/giambio/internal.py b/giambio/internal.py index cecca6f..0449e03 100644 --- a/giambio/internal.py +++ b/giambio/internal.py @@ -29,7 +29,7 @@ class TimeQueue: :param clock: The same monotonic clock that was passed to the thread-local event loop. It is important for the queue to be synchronized with the loop as this allows - the sleeping mechanism to work reliably + the sleeping mechanism to work reliably """ def __init__(self, clock): @@ -44,7 +44,7 @@ class TimeQueue: self.sequence = 0 self.container: List[Tuple[float, int, Task]] = [] - def __contains__(self, item): + def __contains__(self, item: Task): """ Implements item in self. This method behaves as if the queue only contained tasks and ignores @@ -56,7 +56,7 @@ class TimeQueue: return True return False - def index(self, item): + def index(self, item: Task): """ Returns the index of the given item in the list or -1 if it is not present @@ -67,7 +67,7 @@ class TimeQueue: return i return -1 - def discard(self, item): + def discard(self, item: Task): """ Discards an item from the queue and calls heapify(self.container) to keep @@ -112,7 +112,7 @@ class TimeQueue: except IndexError: raise StopIteration from None - def __getitem__(self, item): + def __getitem__(self, item: int): """ Implements self[n] """ diff --git a/giambio/io.py b/giambio/io.py index b3da5a3..8653a72 100644 --- a/giambio/io.py +++ b/giambio/io.py @@ -16,9 +16,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import ssl -from socket import SOL_SOCKET, SO_ERROR -import socket as builtin_socket from giambio.exceptions import ResourceClosed from giambio.traps import want_write, want_read, io_release @@ -114,8 +111,8 @@ class AsyncSocket: raise ResourceClosed("I/O operation on closed socket") await io_release(self.sock) self.sock.close() - self._sock = None - self.sock = -1 + self._fd = -1 + self.sock = None async def shutdown(self, how): """ diff --git a/giambio/run.py b/giambio/runtime.py similarity index 100% rename from giambio/run.py rename to giambio/runtime.py diff --git a/giambio/socket.py b/giambio/socket.py index 3dd0bb2..8ef2d8d 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -17,7 +17,7 @@ limitations under the License. """ import socket as _socket -from .io import AsyncSocket +from giambio.io import AsyncSocket def wrap_socket(sock: _socket.socket) -> AsyncSocket: diff --git a/giambio/sync.py b/giambio/sync.py index 99764f3..ae6582b 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -15,7 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from giambio.traps import event_wait, event_set, current_task +from typing import Any +from giambio.traps import event_wait, event_set from giambio.exceptions import GiambioError @@ -56,12 +57,16 @@ class Queue: NOT thread safe! """ - def __init__(self): + def __init__(self, maxsize: int): """ Object constructor """ self.events = {} + self.container = [] -# async def put + async def put(self, item: Any): + """ + + """ diff --git a/tests/events.py b/tests/events.py index 6f5938c..b024216 100644 --- a/tests/events.py +++ b/tests/events.py @@ -15,7 +15,7 @@ async def child(ev: giambio.Event, pause: int): await giambio.sleep(pause) end_sleep = giambio.clock() - start_sleep end_total = giambio.clock() - start_total - print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!") + print(f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!") async def parent(pause: int = 1): @@ -29,7 +29,7 @@ async def parent(pause: int = 1): await event.trigger() print("[parent] Event set, awaiting child completion") end = giambio.clock() - start - print(f"[parent] Child exited in {end} seconds") + print(f"[parent] Child exited in {end:.2f} seconds") if __name__ == "__main__": diff --git a/tests/socket_ssl.py b/tests/socket_ssl.py index f313b82..d3c1a57 100644 --- a/tests/socket_ssl.py +++ b/tests/socket_ssl.py @@ -1,32 +1,48 @@ from debugger import Debugger +import email +from io import StringIO import giambio import socket as sock import ssl -async def test(host: str, port: int): +async def test(host: str, port: int, bufsize: int = 4096): socket = giambio.socket.wrap_socket( - ssl.wrap_socket( - sock.socket(), - do_handshake_on_connect=False) + ssl.create_default_context().wrap_socket( + sock=sock.socket(), + # Note: do_handshake_on_connect MUST + # be set to False on the synchronous socket! + # Giambio handles the TLS handshake asynchronously + # and making the SSL library handle it blocks + # the entire event loop. To perform the TLS + # handshake upon connection, set the this + # parameter in the AsyncSocket class instead + do_handshake_on_connect=False, + server_hostname=host) ) + print(f"Attempting a connection to {host}:{port}") await socket.connect((host, port)) + print("Connected") async with giambio.skip_after(2) as p: + print(f"Pool with {p.timeout - giambio.clock():.2f} seconds timeout created") async with socket: - await socket.send_all(b"""GET / HTTP/1.1\r - Host: google.com\r - User-Agent: owo\r - Accept: text/html\r - Connection: keep-alive\r\n\r\n""") + print("Entered socket context manager, sending request data") + await socket.send_all(b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n""") + print("Data sent") buffer = b"" - while True: - data = await socket.receive(4096) + while not buffer.endswith(b"\r\n\r\n"): + print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})") + data = await socket.receive(bufsize) + print(f"Received {len(data)} bytes") if data: buffer += data else: + print("Received empty stream, closing connection") break - print("\n".join(buffer.decode().split("\r\n"))) - print(p.timed_out) + print(f"Request has{' not' if not p.timed_out else ''} timed out!") + if buffer: + print(f"HTTP Response below {'(might be incomplete)' if p.timed_out else ''}\n") + print("\n".join(buffer.decode().split("\r\n"))) giambio.run(test, "google.com", 443, debugger=()) diff --git a/tests/timeout.py b/tests/timeout.py index 515dd4c..01d4e50 100644 --- a/tests/timeout.py +++ b/tests/timeout.py @@ -23,4 +23,4 @@ async def main(): if __name__ == "__main__": - giambio.run(main, debugger=Debugger()) + giambio.run(main, debugger=())