(Hopefully) Fixed issues with blockinb I/O and timeouts

This commit is contained in:
Nocturn9x 2022-02-04 11:56:15 +01:00
parent 4e1d328df4
commit f7fbad931a
10 changed files with 80 additions and 45 deletions

View File

@ -21,10 +21,10 @@ __version__ = (0, 0, 1)
from . import exceptions, socket, context, core, task, io from . import exceptions, socket, context, core, task, io
from .traps import sleep, current_task from giambio.traps import sleep, current_task
from .sync import Event from giambio.sync import Event
from .run import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after from giambio.runtime import run, clock, create_pool, get_event_loop, new_event_loop, with_timeout, skip_after
from .util import debug from giambio.util import debug
__all__ = [ __all__ = [

View File

@ -20,9 +20,10 @@ limitations under the License.
import types import types
from giambio.task import Task from giambio.task import Task
from collections import deque from collections import deque
from functools import partial
from timeit import default_timer from timeit import default_timer
from giambio.context import TaskManager from giambio.context import TaskManager
from typing import List, Optional, Any, Dict from typing import Callable, List, Optional, Any, Dict
from giambio.util.debug import BaseDebugger from giambio.util.debug import BaseDebugger
from giambio.internal import TimeQueue, DeadlinesQueue from giambio.internal import TimeQueue, DeadlinesQueue
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@ -125,6 +126,7 @@ class AsyncScheduler:
self.io_skip_limit = io_skip_limit or 5 self.io_skip_limit = io_skip_limit or 5
# The max. I/O timeout # The max. I/O timeout
self.io_max_timeout = io_max_timeout or 86400 self.io_max_timeout = io_max_timeout or 86400
self.entry_point: Optional[Task] = None
def __repr__(self): def __repr__(self):
""" """
@ -211,12 +213,7 @@ class AsyncScheduler:
self.check_io() self.check_io()
if self.deadlines: if self.deadlines:
# Deadline expiration is our next step # Deadline expiration is our next step
try: self.prune_deadlines()
self.prune_deadlines()
except TooSlowError as t:
task = t.args[0]
task.exc = t
self.join(task)
if self.paused: if self.paused:
# Next we try to (re)schedule the asleep tasks # Next we try to (re)schedule the asleep tasks
self.awake_sleeping() self.awake_sleeping()
@ -409,6 +406,26 @@ class AsyncScheduler:
self._data[self.current_task] = self self._data[self.current_task] = self
self.reschedule_running() self.reschedule_running()
def handle_task_exit(self, task: Task, to_call: Callable):
"""
Convenience method for handling StopIteration
exceptions from tasks
"""
try:
to_call()
except StopIteration as ret:
task.status = "end"
task.result = ret.value
task.finished = True
self.join(task)
self.tasks.remove(task)
except BaseException as err:
task.exc = err
self.join(task)
if task in self.tasks:
self.tasks.remove(task)
def prune_deadlines(self): def prune_deadlines(self):
""" """
Removes expired deadlines after their timeout Removes expired deadlines after their timeout
@ -417,14 +434,14 @@ class AsyncScheduler:
while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock():
pool = self.deadlines.get() pool = self.deadlines.get()
if pool.done():
continue
pool.timed_out = True pool.timed_out = True
if not pool.tasks and self.current_task is self.entry_point:
self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point)))
for task in pool.tasks: for task in pool.tasks:
if not task.done(): if not task.done():
self.paused.discard(task) self.paused.discard(task)
self.io_release_task(task) self.io_release_task(task)
task.throw(TooSlowError(task)) self.handle_task_exit(task, partial(task.throw, TooSlowError(task)))
def schedule_tasks(self, tasks: List[Task]): def schedule_tasks(self, tasks: List[Task]):
""" """
@ -448,7 +465,6 @@ class AsyncScheduler:
# expected # expected
if t.done() or t in self.run_ready: if t.done() or t in self.run_ready:
self.paused.discard(t) self.paused.discard(t)
print(t is self.current_task)
while self.paused and self.paused.get_closest_deadline() <= self.clock(): while self.paused and self.paused.get_closest_deadline() <= self.clock():
# Reschedules tasks when their deadline has elapsed # Reschedules tasks when their deadline has elapsed
task = self.paused.get() task = self.paused.get()
@ -525,6 +541,7 @@ class AsyncScheduler:
entry = Task(func.__name__ or str(func), func(*args), None) entry = Task(func.__name__ or str(func), func(*args), None)
self.tasks.append(entry) self.tasks.append(entry)
self.entry_point = entry
self.run_ready.append(entry) self.run_ready.append(entry)
self.debugger.on_start() self.debugger.on_start()
if loop: if loop:

View File

@ -29,7 +29,7 @@ class TimeQueue:
:param clock: The same monotonic clock that was passed to the thread-local event loop. :param clock: The same monotonic clock that was passed to the thread-local event loop.
It is important for the queue to be synchronized with the loop as this allows It is important for the queue to be synchronized with the loop as this allows
the sleeping mechanism to work reliably the sleeping mechanism to work reliably
""" """
def __init__(self, clock): def __init__(self, clock):
@ -44,7 +44,7 @@ class TimeQueue:
self.sequence = 0 self.sequence = 0
self.container: List[Tuple[float, int, Task]] = [] self.container: List[Tuple[float, int, Task]] = []
def __contains__(self, item): def __contains__(self, item: Task):
""" """
Implements item in self. This method behaves Implements item in self. This method behaves
as if the queue only contained tasks and ignores as if the queue only contained tasks and ignores
@ -56,7 +56,7 @@ class TimeQueue:
return True return True
return False return False
def index(self, item): def index(self, item: Task):
""" """
Returns the index of the given item in the list Returns the index of the given item in the list
or -1 if it is not present or -1 if it is not present
@ -67,7 +67,7 @@ class TimeQueue:
return i return i
return -1 return -1
def discard(self, item): def discard(self, item: Task):
""" """
Discards an item from the queue and Discards an item from the queue and
calls heapify(self.container) to keep calls heapify(self.container) to keep
@ -112,7 +112,7 @@ class TimeQueue:
except IndexError: except IndexError:
raise StopIteration from None raise StopIteration from None
def __getitem__(self, item): def __getitem__(self, item: int):
""" """
Implements self[n] Implements self[n]
""" """

View File

@ -16,9 +16,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import ssl
from socket import SOL_SOCKET, SO_ERROR
import socket as builtin_socket
from giambio.exceptions import ResourceClosed from giambio.exceptions import ResourceClosed
from giambio.traps import want_write, want_read, io_release from giambio.traps import want_write, want_read, io_release
@ -114,8 +111,8 @@ class AsyncSocket:
raise ResourceClosed("I/O operation on closed socket") raise ResourceClosed("I/O operation on closed socket")
await io_release(self.sock) await io_release(self.sock)
self.sock.close() self.sock.close()
self._sock = None self._fd = -1
self.sock = -1 self.sock = None
async def shutdown(self, how): async def shutdown(self, how):
""" """

View File

@ -17,7 +17,7 @@ limitations under the License.
""" """
import socket as _socket import socket as _socket
from .io import AsyncSocket from giambio.io import AsyncSocket
def wrap_socket(sock: _socket.socket) -> AsyncSocket: def wrap_socket(sock: _socket.socket) -> AsyncSocket:

View File

@ -15,7 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
from giambio.traps import event_wait, event_set, current_task from typing import Any
from giambio.traps import event_wait, event_set
from giambio.exceptions import GiambioError from giambio.exceptions import GiambioError
@ -56,12 +57,16 @@ class Queue:
NOT thread safe! NOT thread safe!
""" """
def __init__(self): def __init__(self, maxsize: int):
""" """
Object constructor Object constructor
""" """
self.events = {} self.events = {}
self.container = []
# async def put async def put(self, item: Any):
"""
"""

View File

@ -15,7 +15,7 @@ async def child(ev: giambio.Event, pause: int):
await giambio.sleep(pause) await giambio.sleep(pause)
end_sleep = giambio.clock() - start_sleep end_sleep = giambio.clock() - start_sleep
end_total = giambio.clock() - start_total end_total = giambio.clock() - start_total
print(f"[child] Done! Slept for {end_total} seconds total ({end_pause} paused, {end_sleep} sleeping), nice nap!") print(f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!")
async def parent(pause: int = 1): async def parent(pause: int = 1):
@ -29,7 +29,7 @@ async def parent(pause: int = 1):
await event.trigger() await event.trigger()
print("[parent] Event set, awaiting child completion") print("[parent] Event set, awaiting child completion")
end = giambio.clock() - start end = giambio.clock() - start
print(f"[parent] Child exited in {end} seconds") print(f"[parent] Child exited in {end:.2f} seconds")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,32 +1,48 @@
from debugger import Debugger from debugger import Debugger
import email
from io import StringIO
import giambio import giambio
import socket as sock import socket as sock
import ssl import ssl
async def test(host: str, port: int): async def test(host: str, port: int, bufsize: int = 4096):
socket = giambio.socket.wrap_socket( socket = giambio.socket.wrap_socket(
ssl.wrap_socket( ssl.create_default_context().wrap_socket(
sock.socket(), sock=sock.socket(),
do_handshake_on_connect=False) # Note: do_handshake_on_connect MUST
# be set to False on the synchronous socket!
# Giambio handles the TLS handshake asynchronously
# and making the SSL library handle it blocks
# the entire event loop. To perform the TLS
# handshake upon connection, set the this
# parameter in the AsyncSocket class instead
do_handshake_on_connect=False,
server_hostname=host)
) )
print(f"Attempting a connection to {host}:{port}")
await socket.connect((host, port)) await socket.connect((host, port))
print("Connected")
async with giambio.skip_after(2) as p: async with giambio.skip_after(2) as p:
print(f"Pool with {p.timeout - giambio.clock():.2f} seconds timeout created")
async with socket: async with socket:
await socket.send_all(b"""GET / HTTP/1.1\r print("Entered socket context manager, sending request data")
Host: google.com\r await socket.send_all(b"""GET / HTTP/1.1\r\nHost: google.com\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n""")
User-Agent: owo\r print("Data sent")
Accept: text/html\r
Connection: keep-alive\r\n\r\n""")
buffer = b"" buffer = b""
while True: while not buffer.endswith(b"\r\n\r\n"):
data = await socket.receive(4096) print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
data = await socket.receive(bufsize)
print(f"Received {len(data)} bytes")
if data: if data:
buffer += data buffer += data
else: else:
print("Received empty stream, closing connection")
break break
print("\n".join(buffer.decode().split("\r\n"))) print(f"Request has{' not' if not p.timed_out else ''} timed out!")
print(p.timed_out) if buffer:
print(f"HTTP Response below {'(might be incomplete)' if p.timed_out else ''}\n")
print("\n".join(buffer.decode().split("\r\n")))
giambio.run(test, "google.com", 443, debugger=()) giambio.run(test, "google.com", 443, debugger=())

View File

@ -23,4 +23,4 @@ async def main():
if __name__ == "__main__": if __name__ == "__main__":
giambio.run(main, debugger=Debugger()) giambio.run(main, debugger=())