Compare commits

...

4 Commits

Author SHA1 Message Date
Nocturn9x f81071f3b2 Fixes to cancellation 2023-02-22 17:43:14 +01:00
Nocturn9x 0560298f0f Fixed cancellation mechanism 2023-02-22 14:06:54 +01:00
Nocturn9x 4e41e46975 Added SSL socket test 2023-02-22 12:18:58 +01:00
Nocturn9x a1b40bd340 Various bug fixes 2023-02-22 12:18:45 +01:00
4 changed files with 141 additions and 52 deletions

View File

@ -157,11 +157,11 @@ async def wait(task: Task) -> Any | None:
raise SchedulerError("a task cannot join itself")
if current not in task.joiners:
# Luckily we use a set, so this has O(1)
# complexity
# complexity on average
await join(task) # Waiting implies joining!
await syscall("wait", task)
if task.exc and task.state != TaskState.CANCELLED and task.propagate:
# Task raised an error that wasn't directly caused by a cancellation:
# The task raised an error that wasn't directly caused by a cancellation:
# raise it, but do so only the first time wait was called
task.propagate = False
raise task.exc

View File

@ -31,7 +31,7 @@ from aiosched.errors import (
ResourceBroken,
)
from aiosched.context import TaskContext
from selectors import DefaultSelector, BaseSelector
from selectors import DefaultSelector, BaseSelector, EVENT_READ, EVENT_WRITE
class FIFOKernel:
@ -120,18 +120,13 @@ class FIFOKernel:
# There's tasks sleeping and/or on the
# ready queue!
return False
if self.selector.get_map():
for key in self.selector.get_map().values():
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
# because we don't want to just know if there's any resources we're waiting on,
# but if there's at least one non-terminated task that owns a resource we're
# waiting on. This avoids issues such as the event loop never exiting if the
# user forgets to close a socket, for example
key.data: Task
if key.data.done():
continue
elif self.get_task_io(key.data):
return False
if self.get_active_io_count():
# We don't just do any([self.paused, self.run_ready, self.selector.get_map()])
# because we don't want to just know if there's any resources we're waiting on,
# but if there's at least one non-terminated task that owns a resource we're
# waiting on. This avoids issues such as the event loop never exiting if the
# user forgets to close a socket, for example
return False
return True
def close(self, force: bool = False):
@ -193,15 +188,15 @@ class FIFOKernel:
self.debugger.before_io(timeout)
# Get sockets that are ready and schedule their tasks
for key, _ in self.selector.select(timeout):
key.data: Task
if key.data.state == TaskState.IO:
key.data: dict[int, Task]
for task in key.data.values():
# We don't reschedule a task that wasn't
# blocking on I/O before: this way if a
# task waits on a socket and then goes to
# sleep, it won't be woken up early if the
# resource becomes available before its
# deadline expires
self.run_ready.append(key.data) # Resource ready? Schedule its task
self.run_ready.append(task) # Resource ready? Schedule its task
self.debugger.after_io(self.clock() - before_time)
def awake_tasks(self):
@ -261,10 +256,15 @@ class FIFOKernel:
# Sets the currently running task
self.current_task = self.run_ready.popleft()
while self.current_task.done():
# We need to make sure we don't try to execute
# exited tasks that are on the running queue
# We make sure not to schedule
# any terminated tasks. Might want
# to eventually get rid of this code,
# but for now it does the job
if not self.run_ready:
return # No more tasks to run!
# We'll let run() handle the I/O
# or the shutdown if necessary, as
# there are no more runnable tasks
return
self.current_task = self.run_ready.popleft()
# We nullify the exception object just in case the
# entry point raised and caught an error so that
@ -326,13 +326,13 @@ class FIFOKernel:
# the closest deadline to avoid starving sleeping tasks
# or missing deadlines
if self.selector.get_map():
self.wait_io()
self.handle_errors(self.wait_io)
if self.paused:
# Next we check for deadlines
self.awake_tasks()
self.handle_errors(self.awake_tasks)
else:
# Otherwise, while there are tasks ready to run, we run them!
self.handle_task_run(self.run_task_step)
self.handle_errors(self.run_task_step)
def start(
self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs
@ -369,6 +369,7 @@ class FIFOKernel:
if self.selector.get_map() and resource in self.selector.get_map():
self.selector.unregister(resource)
self.debugger.on_io_unschedule(resource)
self.reschedule_running()
def io_release_task(self, task: Task):
"""
@ -377,19 +378,26 @@ class FIFOKernel:
"""
for key in filter(
lambda k: k.data == task, dict(self.selector.get_map()).values()
lambda k: task in k.data.values(), dict(self.selector.get_map()).values()
):
self.notify_closing(key.fileobj, broken=True)
self.selector.unregister(key.fileobj)
task.last_io = ()
def get_task_io(self, task: Task) -> list:
def get_active_io_count(self) -> int:
"""
Returns the streams currently in use by
the given task
Returns the number of streams that are currently
being used by any active task
"""
return list(map(lambda k: k.fileobj, filter(lambda k: k.data == task, self.selector.get_map().values())))
result = 0
for key in (self.selector.get_map() or {}).values():
key.data: dict[int, Task]
for task in key.data.values():
if task.done():
continue
result += 1
return result
def notify_closing(self, stream, broken: bool = False):
"""
@ -407,10 +415,11 @@ class FIFOKernel:
lambda o: o.fileobj == stream,
dict(self.selector.get_map()).values(),
):
if k.data != self.current_task:
# We don't want to raise an error inside
# the task that's trying to close the stream!
self.handle_task_run(partial(k.data.throw, exc), k.data)
for task in k.data.values():
if task is not self.current_task:
# We don't want to raise an error inside
# the task that's trying to close the stream!
self.handle_errors(partial(k.data.throw, exc), k.data)
self.reschedule_running()
def cancel(self, task: Task):
@ -420,14 +429,14 @@ class FIFOKernel:
it fails
"""
self.io_release_task(task)
self.paused.discard(task)
self.handle_task_run(partial(task.throw, Cancelled(task)), task)
self.handle_errors(partial(task.throw, Cancelled(task)), task)
if task.state != TaskState.CANCELLED:
task.pending_cancellation = True
self.io_release_task(task)
self.paused.discard(task)
self.reschedule_running()
def handle_task_run(self, func: Callable, task: Task | None = None):
def handle_errors(self, func: Callable, task: Task | None = None):
"""
Convenience method for handling various exceptions
from tasks
@ -470,6 +479,7 @@ class FIFOKernel:
task = task or self.current_task
task.exc = err
task.state = TaskState.CRASHED
self.debugger.on_exception_raised(task, err)
self.wait(task)
def sleep(self, seconds: int | float):
@ -499,7 +509,6 @@ class FIFOKernel:
self.paused.discard(task)
self.io_release_task(task)
self.run_ready.extend(task.joiners)
self.reschedule_running()
def join(self, task: Task):
"""
@ -575,19 +584,19 @@ class FIFOKernel:
self.current_task.state = TaskState.IO
if self.current_task.last_io:
# Since, most of the time, tasks will perform multiple
# Since most of the time tasks will perform multiple
# I/O operations on a given resource, unregistering them
# every time isn't a sensible approach. A quick and
# easy optimization to address this problem is to
# store the last I/O operation that the task performed
# store the last I/O operation that the task performed,
# together with the resource itself, inside the task
# object. If the task then tries to perform the same
# operation on the same resource again, then this method
# returns immediately as it is already being watched by
# the selector. If the resource is the same, but the
# operation on the same resource again, this method then
# returns immediately as the resource is already being watched
# by the selector. If the resource is the same, but the
# event type has changed, then we modify the resource's
# associated event. Only if the resource is different from
# the last one used, then this method will register a new
# the last one used then this method will register a new
# one
if self.current_task.last_io == (evt_type, resource):
# Selector is already listening for that event on
@ -595,7 +604,7 @@ class FIFOKernel:
return
elif self.current_task.last_io[1] == resource:
# If the event to listen for has changed we just modify it
self.selector.modify(resource, evt_type, self.current_task)
self.selector.modify(resource, evt_type, {evt_type: self.current_task})
self.current_task.last_io = (evt_type, resource)
self.debugger.on_io_schedule(resource, evt_type)
elif not self.current_task.last_io or self.current_task.last_io[1] != resource:
@ -603,21 +612,26 @@ class FIFOKernel:
# I/O for the first time
self.current_task.last_io = evt_type, resource
try:
self.selector.register(resource, evt_type, self.current_task)
self.selector.register(resource, evt_type, {evt_type: self.current_task})
self.debugger.on_io_schedule(resource, evt_type)
except KeyError:
# The stream is already being used
key = self.selector.get_key(resource)
if key.data == self.current_task or evt_type != key.events:
if key.data[key.events] == self.current_task:
# If the task that registered the stream
# changed their mind on what they want
# to do with it, who are we to deny their
# request? We also modify the event in
# request?
self.selector.modify(resource, key.events | evt_type, {EVENT_READ: self.current_task,
EVENT_WRITE: self.current_task})
self.debugger.on_io_schedule(resource, evt_type)
elif key.events != evt_type:
# We also modify the event in
# our selector so that one task can read
# off a given stream while another one is
# writing to it
self.selector.modify(resource, evt_type, self.current_task)
self.debugger.on_io_schedule(resource, evt_type)
self.selector.modify(resource, key.events | evt_type, {evt_type: self.current_task,
key.events: list(key.data.values())[0]})
else:
# One task reading and one writing on the same
# resource is fine (think producer-consumer),

View File

@ -1,7 +1,7 @@
import sys
import logging
import aiosched
from debugger import Debugger
import logging
import sys
# A test to check for asynchronous I/O

75
tests/socket_ssl.py Normal file
View File

@ -0,0 +1,75 @@
from debugger import Debugger
import aiosched
import socket as sock
import ssl
import sys
import time
_print = print
def print(*args, **kwargs):
sys.stdout.write(f"[{time.strftime('%H:%M:%S')}] ")
_print(*args, **kwargs)
async def test(host: str, port: int, bufsize: int = 4096):
socket = aiosched.socket.wrap_socket(
ssl.create_default_context().wrap_socket(
sock=sock.socket(),
# Note: do_handshake_on_connect MUST
# be set to False on the synchronous socket!
# The library handles the TLS handshake asynchronously
# and making the SSL library handle it blocks
# the entire event loop. To perform the TLS
# handshake upon connection, set this parameter
# in the AsyncSocket class instead
do_handshake_on_connect=False,
server_hostname=host,
)
)
print(f"Attempting a connection to {host}:{port}")
await socket.connect((host, port))
print("Connected")
async with aiosched.with_context():
async with socket:
# Closes the socket automatically
print("Entered socket context manager, sending request data")
await socket.send_all(
f"GET / HTTP/1.1\r\nHost: {host}\r\nUser-Agent: owo\r\nAccept: text/html\r\nConnection: keep-alive\r\nAccept: */*\r\n\r\n".encode()
)
print("Data sent")
buffer = b""
while not buffer.endswith(b"\r\n\r\n"):
print(f"Requesting up to {bufsize} bytes (current response size: {len(buffer)})")
data = await socket.receive(bufsize)
if data:
print(f"Received {len(data)} bytes")
buffer += data
else:
print("Received empty stream, closing connection")
break
if buffer:
data = buffer.decode().split("\r\n")
print(f"HTTP Response below:")
_print(f"Response: {data[0]}")
_print("Headers:")
content = False
for i, element in enumerate(data):
if i == 0:
continue
else:
if not element.strip() and not content:
# This only works because google sends a newline
# before the content
sys.stdout.write("\nContent:")
content = True
if not content:
_print(f"\t{element}")
else:
for line in element.split("\n"):
_print(f"\t{line}")
_print("Done!")
aiosched.run(test, "debian.org", 443, 256, debugger=())