Fixed a bug with nested task scopes and deadlines. Add correctness checks to TaskPool and other minor changes

This commit is contained in:
Mattia Giambirtone 2024-03-19 21:44:50 +01:00
parent bc5e0f167f
commit c85e3037e4
5 changed files with 8 additions and 9 deletions

View File

@ -457,7 +457,7 @@ class BaseIOManager(ABC):
"""
@abstractmethod
def wait_io(self, current_time):
def wait_io(self):
"""
Waits for I/O and reschedules tasks
when data is ready to be read/written

View File

@ -151,6 +151,7 @@ class TaskPool:
raise exc_val.with_traceback(exc_tb)
elif not self.done():
await suspend()
assert self.done()
else:
await checkpoint()
except Cancelled as e:

View File

@ -271,7 +271,6 @@ class DefaultKernel(BaseKernel):
error = TimedOut("timed out")
error.scope = scope
self.throw(scope.owner, error)
self.reschedule(scope.owner)
def wakeup(self):
while (
@ -293,7 +292,7 @@ class DefaultKernel(BaseKernel):
self.wakeup()
self.check_scopes()
if self.io_manager.pending():
self.io_manager.wait_io(self.clock.current_time())
self.io_manager.wait_io()
def run(self):
"""

View File

@ -67,16 +67,16 @@ class SimpleIOManager(BaseIOManager):
if self._closed:
raise structio.exceptions.ResourceClosed("the I/O manager is closed")
def wait_io(self, current_time):
def wait_io(self):
self._check_closed()
kernel: BaseKernel = current_loop()
current_time = kernel.clock.current_time()
deadline = kernel.get_closest_deadline()
if deadline == float("inf"):
deadline = 0
elif deadline > 0:
deadline -= current_time
deadline = max(0, deadline)
current = kernel.clock.current_time()
readers = self._collect_readers()
writers = self._collect_writers()
kernel.event("before_io", deadline)
@ -86,7 +86,7 @@ class SimpleIOManager(BaseIOManager):
writers + readers,
deadline,
)
kernel.event("after_io", kernel.clock.current_time() - current)
kernel.event("after_io", kernel.clock.current_time() - current_time)
# On Windows, a successful connection is marked
# as an exceptional event rather than a write
# one

View File

@ -386,14 +386,13 @@ class AsyncSocket(AsyncResource):
if self._fd == -1:
raise ResourceClosed("I/O operation on closed socket")
with self.write_lock, self.read_lock:
connected = False
while not connected:
while True:
try:
self.socket.connect(address)
if self.do_handshake_on_connect:
await self.do_handshake()
connected = True
await checkpoint()
break
except WantRead:
await wait_readable(self._fd)
except WantWrite: