Fixed minor socket bug on darwin kernel, nested pools now work as intended

This commit is contained in:
nocturn9x 2021-04-22 11:30:35 +02:00
parent 5b403703db
commit dcd3cae674
10 changed files with 120 additions and 93 deletions

View File

@ -151,9 +151,6 @@ class AsyncScheduler:
# we still need to make sure we don't try to execute # we still need to make sure we don't try to execute
# exited tasks that are on the running queue # exited tasks that are on the running queue
continue continue
# Sets the current pool: we need this to take nested pools
# into account and behave accordingly
self.current_pool = self.current_task.pool
if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out:
# Stores deadlines for tasks (deadlines are pool-specific). # Stores deadlines for tasks (deadlines are pool-specific).
# The deadlines queue will internally make sure not to store # The deadlines queue will internally make sure not to store
@ -283,6 +280,27 @@ class AsyncScheduler:
self.tasks.append(task) self.tasks.append(task)
self.debugger.after_sleep(task, slept) self.debugger.after_sleep(task, slept)
def get_closest_deadline(self) -> float:
"""
Gets the closest expiration deadline (asleep tasks, timeouts)
:return: The closest deadline according to our clock
:rtype: float
"""
if not self.deadlines:
# If there are no deadlines just wait until the first task wakeup
timeout = max(0.0, self.paused.get_closest_deadline() - self.clock())
elif not self.paused:
# If there are no sleeping tasks just wait until the first deadline
timeout = max(0.0, self.deadlines.get_closest_deadline() - self.clock())
else:
# If there are both deadlines AND sleeping tasks scheduled, we calculate
# the absolute closest deadline among the two sets and use that as a timeout
clock = self.clock()
timeout = min([max(0.0, self.paused.get_closest_deadline() - clock), self.deadlines.get_closest_deadline() - clock])
return timeout
def check_io(self): def check_io(self):
""" """
Checks for I/O and implements part of the sleeping mechanism Checks for I/O and implements part of the sleeping mechanism
@ -306,18 +324,7 @@ class AsyncScheduler:
timeout = 0.0 timeout = 0.0
elif self.paused or self.deadlines: elif self.paused or self.deadlines:
# If there are asleep tasks or deadlines, wait until the closest date # If there are asleep tasks or deadlines, wait until the closest date
if not self.deadlines: timeout = self.get_closest_deadline()
# If there are no deadlines just wait until the first task wakeup
timeout = min([max(0.0, self.paused.get_closest_deadline() - self.clock())])
elif not self.paused:
# If there are no sleeping tasks just wait until the first deadline
timeout = min([max(0.0, self.deadlines.get_closest_deadline() - self.clock())])
else:
# If there are both deadlines AND sleeping tasks scheduled, we calculate
# the absolute closest deadline among the two sets and use that as a timeout
clock = self.clock()
timeout = min([max(0.0, self.paused.get_closest_deadline() - clock),
self.deadlines.get_closest_deadline() - clock])
else: else:
# If there is *only* I/O, we wait a fixed amount of time # If there is *only* I/O, we wait a fixed amount of time
timeout = 86400 # Stolen from trio :D timeout = 86400 # Stolen from trio :D
@ -342,7 +349,7 @@ class AsyncScheduler:
if entry.exc: if entry.exc:
raise entry.exc raise entry.exc
def cancel_pool(self, pool: TaskManager): def cancel_pool(self, pool: TaskManager) -> bool:
""" """
Cancels all tasks in the given pool Cancels all tasks in the given pool
@ -362,7 +369,7 @@ class AsyncScheduler:
else: # If we're at the main task, we're sure everything else exited else: # If we're at the main task, we're sure everything else exited
return True return True
def get_event_tasks(self): def get_event_tasks(self) -> Task:
""" """
Yields all tasks currently waiting on events Yields all tasks currently waiting on events
""" """
@ -371,7 +378,7 @@ class AsyncScheduler:
for waiter in evt.waiters: for waiter in evt.waiters:
yield waiter yield waiter
def get_asleep_tasks(self): def get_asleep_tasks(self) -> Task:
""" """
Yields all tasks that are currently sleeping Yields all tasks that are currently sleeping
""" """
@ -379,7 +386,7 @@ class AsyncScheduler:
for asleep in self.paused.container: for asleep in self.paused.container:
yield asleep[2] # Deadline, tiebreaker, task yield asleep[2] # Deadline, tiebreaker, task
def get_io_tasks(self): def get_io_tasks(self) -> Task:
""" """
Yields all tasks currently waiting on I/O resources Yields all tasks currently waiting on I/O resources
""" """
@ -387,7 +394,7 @@ class AsyncScheduler:
for k in self.selector.get_map().values(): for k in self.selector.get_map().values():
yield k.data yield k.data
def get_all_tasks(self): def get_all_tasks(self) -> chain:
""" """
Returns a generator yielding all tasks which the loop is currently Returns a generator yielding all tasks which the loop is currently
keeping track of: this includes both running and paused tasks. keeping track of: this includes both running and paused tasks.
@ -463,6 +470,20 @@ class AsyncScheduler:
# occur # occur
self.tasks.append(t) self.tasks.append(t)
def is_pool_done(self, pool: TaskManager) -> bool:
"""
Returns true if the given pool has finished
running and can be safely terminated
:return: Whether the pool and any enclosing pools finished running
:rtype: bool
"""
if not pool:
# The parent task has no pool
return True
return pool.done()
def join(self, task: Task): def join(self, task: Task):
""" """
Joins a task to its callers (implicitly, the parent Joins a task to its callers (implicitly, the parent
@ -472,7 +493,7 @@ class AsyncScheduler:
task.joined = True task.joined = True
if task.finished or task.cancelled: if task.finished or task.cancelled:
if self.current_pool and self.current_pool.done() or not self.current_pool: if self.is_pool_done(self.current_pool):
# If the current pool has finished executing or we're at the first parent # If the current pool has finished executing or we're at the first parent
# task that kicked the loop, we can safely reschedule the parent(s) # task that kicked the loop, we can safely reschedule the parent(s)
self.reschedule_joiners(task) self.reschedule_joiners(task)
@ -512,6 +533,7 @@ class AsyncScheduler:
""" """
if task.done(): if task.done():
self.ensure_discard(task)
# The task isn't running already! # The task isn't running already!
return return
elif task.status in ("io", "sleep", "init"): elif task.status in ("io", "sleep", "init"):
@ -528,7 +550,7 @@ class AsyncScheduler:
# But we also need to cancel a task if it was not sleeping or waiting on # But we also need to cancel a task if it was not sleeping or waiting on
# any I/O because it could never do so (therefore blocking everything # any I/O because it could never do so (therefore blocking everything
# forever). So, when cancellation can't be done right away, we schedule # forever). So, when cancellation can't be done right away, we schedule
# if for the next execution step of the task. Giambio will also make sure # it for the next execution step of the task. Giambio will also make sure
# to re-raise cancellations at every checkpoint until the task lets the # to re-raise cancellations at every checkpoint until the task lets the
# exception propagate into us, because we *really* want the task to be # exception propagate into us, because we *really* want the task to be
# cancelled # cancelled
@ -642,7 +664,15 @@ class AsyncScheduler:
""" """
await want_read(sock) await want_read(sock)
return sock.accept() try:
return sock.accept()
except BlockingIOError:
# Some platforms (namely OSX systems) act weird and handle
# the errno 35 signal (EAGAIN) for sockets in a weird manner,
# and this seems to fix the issue. Not sure about why since we
# already called want_read above, but it ain't stupid if it works I guess
await want_read(sock)
return sock.accept()
# noinspection PyMethodMayBeStatic # noinspection PyMethodMayBeStatic
async def sock_sendall(self, sock: socket.socket, data: bytes): async def sock_sendall(self, sock: socket.socket, data: bytes):
@ -657,7 +687,11 @@ class AsyncScheduler:
while data: while data:
await want_write(sock) await want_write(sock)
sent_no = sock.send(data) try:
sent_no = sock.send(data)
except BlockingIOError:
await want_write(sock)
sent_no = sock.send(data)
data = data[sent_no:] data = data[sent_no:]
async def close_sock(self, sock: socket.socket): async def close_sock(self, sock: socket.socket):
@ -669,7 +703,11 @@ class AsyncScheduler:
""" """
await want_write(sock) await want_write(sock)
sock.close() try:
sock.close()
except BlockingIOError:
await want_write(sock)
sock.close()
self.selector.unregister(sock) self.selector.unregister(sock)
self.current_task.last_io = () self.current_task.last_io = ()
@ -687,4 +725,8 @@ class AsyncScheduler:
""" """
await want_write(sock) await want_write(sock)
return sock.connect(address_tuple) try:
return sock.connect(address_tuple)
except BlockingIOError:
await want_write(sock)
return sock.connect(address_tuple)

View File

@ -90,7 +90,10 @@ def create_pool():
Creates an async pool Creates an async pool
""" """
return TaskManager(get_event_loop()) loop = get_event_loop()
pool = TaskManager(loop)
loop.current_pool = pool
return pool
def with_timeout(timeout: int or float): def with_timeout(timeout: int or float):
@ -98,4 +101,7 @@ def with_timeout(timeout: int or float):
Creates an async pool with an associated timeout Creates an async pool with an associated timeout
""" """
return TaskManager(get_event_loop(), timeout) loop = get_event_loop()
pool = TaskManager(loop, timeout)
loop.current_pool = pool
return pool

View File

@ -5,8 +5,8 @@ with open("README.md", "r") as readme:
setuptools.setup( setuptools.setup(
name="GiambIO", name="GiambIO",
version="1.0", version="1.0.1",
author="Nocturn9x aka IsGiambyy", author="Nocturn9x",
author_email="hackhab@gmail.com", author_email="hackhab@gmail.com",
description="Asynchronous Python made easy (and friendly)", description="Asynchronous Python made easy (and friendly)",
long_description=long_description, long_description=long_description,
@ -18,5 +18,5 @@ setuptools.setup(
"Operating System :: OS Independent", "Operating System :: OS Independent",
"License :: OSI Approved :: Apache License 2.0", "License :: OSI Approved :: Apache License 2.0",
], ],
python_requires=">=3.6", python_requires=">=3.8",
) )

View File

View File

@ -2,23 +2,17 @@ import giambio
from debugger import Debugger from debugger import Debugger
async def child(): async def child(name: int):
print("[child] Child spawned!! Sleeping for 2 seconds") print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
await giambio.sleep(2) await giambio.sleep(name)
print("[child] Had a nice nap!") print(f"[child {name}] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 1] Had a nice nap!")
async def main(): async def main():
start = giambio.clock() start = giambio.clock()
async with giambio.create_pool() as pool: async with giambio.create_pool() as pool:
pool.spawn(child) # If you comment this line, the pool will exit immediately! pool.spawn(child, 1) # If you comment this line, the pool will exit immediately!
task = pool.spawn(child1) task = pool.spawn(child, 2)
await task.cancel() await task.cancel()
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")

View File

@ -1,36 +0,0 @@
import giambio
from debugger import Debugger
async def child():
print("[child] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 1] Had a nice nap!")
async def child2():
print("[child 2] Child spawned!! Sleeping for 2 seconds")
await giambio.sleep(2)
print("[child 2] Had a nice nap!")
async def main():
start = giambio.clock()
async with giambio.create_pool() as pool:
pool.spawn(child)
pool.spawn(child1)
# async with giambio.create_pool() as a_pool:
# a_pool.spawn(child2)
await pool.cancel() # This cancels the *whole* block
print("[main] Children spawned, awaiting completion")
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=Debugger())

View File

@ -32,4 +32,4 @@ async def parent(pause: int = 1):
if __name__ == "__main__": if __name__ == "__main__":
giambio.run(parent, 3) giambio.run(parent, 3)

26
tests/nested_pool.py Normal file
View File

@ -0,0 +1,26 @@
import giambio
from debugger import Debugger
async def child(name: int):
print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
await giambio.sleep(name)
print(f"[child {name}] Had a nice nap!")
async def main():
start = giambio.clock()
async with giambio.create_pool() as pool:
pool.spawn(child, 1)
pool.spawn(child, 2)
async with giambio.create_pool() as a_pool:
a_pool.spawn(child, 3)
a_pool.spawn(child, 4)
print("[main] Children spawned, awaiting completion")
# This will *only* execute when everything inside the async with block
# has ran, including any other pool
print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")
if __name__ == "__main__":
giambio.run(main, debugger=())

View File

@ -67,3 +67,4 @@ if __name__ == "__main__":
logging.info("Ctrl+C detected, exiting") logging.info("Ctrl+C detected, exiting")
else: else:
logging.error(f"Exiting due to a {type(error).__name__}: {error}") logging.error(f"Exiting due to a {type(error).__name__}: {error}")
raise

View File

@ -2,16 +2,10 @@ import giambio
from debugger import Debugger from debugger import Debugger
async def child(): async def child(name: int):
print("[child] Child spawned!! Sleeping for 5 seconds") print(f"[child {name}] Child spawned!! Sleeping for {name} seconds")
await giambio.sleep(5) await giambio.sleep(name)
print("[child] Had a nice nap!") print(f"[child {name}] Had a nice nap!")
async def child1():
print("[child 1] Child spawned!! Sleeping for 10 seconds")
await giambio.sleep(10)
print("[child 1] Had a nice nap!")
async def main(): async def main():
@ -20,9 +14,9 @@ async def main():
async with giambio.with_timeout(6) as pool: async with giambio.with_timeout(6) as pool:
# TODO: We need to consider the inner part of # TODO: We need to consider the inner part of
# the with block as an implicit task, otherwise # the with block as an implicit task, otherwise
# timeouts and cancellations won't work properly! # timeouts and cancellations won't work with await fn()!
pool.spawn(child) # This will complete pool.spawn(child, 5) # This will complete
pool.spawn(child1) # This will not pool.spawn(child, 10) # This will not
print("[main] Children spawned, awaiting completion") print("[main] Children spawned, awaiting completion")
except giambio.exceptions.TooSlowError: except giambio.exceptions.TooSlowError:
print("[main] One or more children have timed out!") print("[main] One or more children have timed out!")