From dcd3cae6740ead31b308fced466a60d5d3ea220f Mon Sep 17 00:00:00 2001 From: nocturn9x Date: Thu, 22 Apr 2021 11:30:35 +0200 Subject: [PATCH] Fixed minor socket bug on darwin kernel, nested pools now work as intended --- giambio/core.py | 94 ++++++++++++++++++++++++++++++++------------ giambio/run.py | 10 ++++- setup.py | 6 +-- tests/__init__.py | 0 tests/cancel.py | 18 +++------ tests/cancel_pool.py | 36 ----------------- tests/events.py | 2 +- tests/nested_pool.py | 26 ++++++++++++ tests/server.py | 1 + tests/timeout.py | 20 ++++------ 10 files changed, 120 insertions(+), 93 deletions(-) delete mode 100644 tests/__init__.py delete mode 100644 tests/cancel_pool.py create mode 100644 tests/nested_pool.py diff --git a/giambio/core.py b/giambio/core.py index b1f23db..d725798 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -151,9 +151,6 @@ class AsyncScheduler: # we still need to make sure we don't try to execute # exited tasks that are on the running queue continue - # Sets the current pool: we need this to take nested pools - # into account and behave accordingly - self.current_pool = self.current_task.pool if self.current_pool and self.current_pool.timeout and not self.current_pool.timed_out: # Stores deadlines for tasks (deadlines are pool-specific). # The deadlines queue will internally make sure not to store @@ -283,6 +280,27 @@ class AsyncScheduler: self.tasks.append(task) self.debugger.after_sleep(task, slept) + def get_closest_deadline(self) -> float: + """ + Gets the closest expiration deadline (asleep tasks, timeouts) + + :return: The closest deadline according to our clock + :rtype: float + """ + + if not self.deadlines: + # If there are no deadlines just wait until the first task wakeup + timeout = max(0.0, self.paused.get_closest_deadline() - self.clock()) + elif not self.paused: + # If there are no sleeping tasks just wait until the first deadline + timeout = max(0.0, self.deadlines.get_closest_deadline() - self.clock()) + else: + # If there are both deadlines AND sleeping tasks scheduled, we calculate + # the absolute closest deadline among the two sets and use that as a timeout + clock = self.clock() + timeout = min([max(0.0, self.paused.get_closest_deadline() - clock), self.deadlines.get_closest_deadline() - clock]) + return timeout + def check_io(self): """ Checks for I/O and implements part of the sleeping mechanism @@ -306,18 +324,7 @@ class AsyncScheduler: timeout = 0.0 elif self.paused or self.deadlines: # If there are asleep tasks or deadlines, wait until the closest date - if not self.deadlines: - # If there are no deadlines just wait until the first task wakeup - timeout = min([max(0.0, self.paused.get_closest_deadline() - self.clock())]) - elif not self.paused: - # If there are no sleeping tasks just wait until the first deadline - timeout = min([max(0.0, self.deadlines.get_closest_deadline() - self.clock())]) - else: - # If there are both deadlines AND sleeping tasks scheduled, we calculate - # the absolute closest deadline among the two sets and use that as a timeout - clock = self.clock() - timeout = min([max(0.0, self.paused.get_closest_deadline() - clock), - self.deadlines.get_closest_deadline() - clock]) + timeout = self.get_closest_deadline() else: # If there is *only* I/O, we wait a fixed amount of time timeout = 86400 # Stolen from trio :D @@ -342,7 +349,7 @@ class AsyncScheduler: if entry.exc: raise entry.exc - def cancel_pool(self, pool: TaskManager): + def cancel_pool(self, pool: TaskManager) -> bool: """ Cancels all tasks in the given pool @@ -362,7 +369,7 @@ class AsyncScheduler: else: # If we're at the main task, we're sure everything else exited return True - def get_event_tasks(self): + def get_event_tasks(self) -> Task: """ Yields all tasks currently waiting on events """ @@ -371,7 +378,7 @@ class AsyncScheduler: for waiter in evt.waiters: yield waiter - def get_asleep_tasks(self): + def get_asleep_tasks(self) -> Task: """ Yields all tasks that are currently sleeping """ @@ -379,7 +386,7 @@ class AsyncScheduler: for asleep in self.paused.container: yield asleep[2] # Deadline, tiebreaker, task - def get_io_tasks(self): + def get_io_tasks(self) -> Task: """ Yields all tasks currently waiting on I/O resources """ @@ -387,7 +394,7 @@ class AsyncScheduler: for k in self.selector.get_map().values(): yield k.data - def get_all_tasks(self): + def get_all_tasks(self) -> chain: """ Returns a generator yielding all tasks which the loop is currently keeping track of: this includes both running and paused tasks. @@ -463,6 +470,20 @@ class AsyncScheduler: # occur self.tasks.append(t) + def is_pool_done(self, pool: TaskManager) -> bool: + """ + Returns true if the given pool has finished + running and can be safely terminated + + :return: Whether the pool and any enclosing pools finished running + :rtype: bool + """ + + if not pool: + # The parent task has no pool + return True + return pool.done() + def join(self, task: Task): """ Joins a task to its callers (implicitly, the parent @@ -472,7 +493,7 @@ class AsyncScheduler: task.joined = True if task.finished or task.cancelled: - if self.current_pool and self.current_pool.done() or not self.current_pool: + if self.is_pool_done(self.current_pool): # If the current pool has finished executing or we're at the first parent # task that kicked the loop, we can safely reschedule the parent(s) self.reschedule_joiners(task) @@ -512,6 +533,7 @@ class AsyncScheduler: """ if task.done(): + self.ensure_discard(task) # The task isn't running already! return elif task.status in ("io", "sleep", "init"): @@ -528,7 +550,7 @@ class AsyncScheduler: # But we also need to cancel a task if it was not sleeping or waiting on # any I/O because it could never do so (therefore blocking everything # forever). So, when cancellation can't be done right away, we schedule - # if for the next execution step of the task. Giambio will also make sure + # it for the next execution step of the task. Giambio will also make sure # to re-raise cancellations at every checkpoint until the task lets the # exception propagate into us, because we *really* want the task to be # cancelled @@ -642,7 +664,15 @@ class AsyncScheduler: """ await want_read(sock) - return sock.accept() + try: + return sock.accept() + except BlockingIOError: + # Some platforms (namely OSX systems) act weird and handle + # the errno 35 signal (EAGAIN) for sockets in a weird manner, + # and this seems to fix the issue. Not sure about why since we + # already called want_read above, but it ain't stupid if it works I guess + await want_read(sock) + return sock.accept() # noinspection PyMethodMayBeStatic async def sock_sendall(self, sock: socket.socket, data: bytes): @@ -657,7 +687,11 @@ class AsyncScheduler: while data: await want_write(sock) - sent_no = sock.send(data) + try: + sent_no = sock.send(data) + except BlockingIOError: + await want_write(sock) + sent_no = sock.send(data) data = data[sent_no:] async def close_sock(self, sock: socket.socket): @@ -669,7 +703,11 @@ class AsyncScheduler: """ await want_write(sock) - sock.close() + try: + sock.close() + except BlockingIOError: + await want_write(sock) + sock.close() self.selector.unregister(sock) self.current_task.last_io = () @@ -687,4 +725,8 @@ class AsyncScheduler: """ await want_write(sock) - return sock.connect(address_tuple) + try: + return sock.connect(address_tuple) + except BlockingIOError: + await want_write(sock) + return sock.connect(address_tuple) diff --git a/giambio/run.py b/giambio/run.py index c15d583..6ad3e71 100644 --- a/giambio/run.py +++ b/giambio/run.py @@ -90,7 +90,10 @@ def create_pool(): Creates an async pool """ - return TaskManager(get_event_loop()) + loop = get_event_loop() + pool = TaskManager(loop) + loop.current_pool = pool + return pool def with_timeout(timeout: int or float): @@ -98,4 +101,7 @@ def with_timeout(timeout: int or float): Creates an async pool with an associated timeout """ - return TaskManager(get_event_loop(), timeout) + loop = get_event_loop() + pool = TaskManager(loop, timeout) + loop.current_pool = pool + return pool diff --git a/setup.py b/setup.py index 4193cd7..c8e9aa5 100644 --- a/setup.py +++ b/setup.py @@ -5,8 +5,8 @@ with open("README.md", "r") as readme: setuptools.setup( name="GiambIO", - version="1.0", - author="Nocturn9x aka IsGiambyy", + version="1.0.1", + author="Nocturn9x", author_email="hackhab@gmail.com", description="Asynchronous Python made easy (and friendly)", long_description=long_description, @@ -18,5 +18,5 @@ setuptools.setup( "Operating System :: OS Independent", "License :: OSI Approved :: Apache License 2.0", ], - python_requires=">=3.6", + python_requires=">=3.8", ) diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/cancel.py b/tests/cancel.py index d76e54a..d55a80f 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -2,23 +2,17 @@ import giambio from debugger import Debugger -async def child(): - print("[child] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child] Had a nice nap!") - - -async def child1(): - print("[child 1] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child 1] Had a nice nap!") +async def child(name: int): + print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") + await giambio.sleep(name) + print(f"[child {name}] Had a nice nap!") async def main(): start = giambio.clock() async with giambio.create_pool() as pool: - pool.spawn(child) # If you comment this line, the pool will exit immediately! - task = pool.spawn(child1) + pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! + task = pool.spawn(child, 2) await task.cancel() print("[main] Children spawned, awaiting completion") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") diff --git a/tests/cancel_pool.py b/tests/cancel_pool.py deleted file mode 100644 index e5d5deb..0000000 --- a/tests/cancel_pool.py +++ /dev/null @@ -1,36 +0,0 @@ -import giambio -from debugger import Debugger - - -async def child(): - print("[child] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child] Had a nice nap!") - - -async def child1(): - print("[child 1] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child 1] Had a nice nap!") - - -async def child2(): - print("[child 2] Child spawned!! Sleeping for 2 seconds") - await giambio.sleep(2) - print("[child 2] Had a nice nap!") - - -async def main(): - start = giambio.clock() - async with giambio.create_pool() as pool: - pool.spawn(child) - pool.spawn(child1) - # async with giambio.create_pool() as a_pool: - # a_pool.spawn(child2) - await pool.cancel() # This cancels the *whole* block - print("[main] Children spawned, awaiting completion") - print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") - - -if __name__ == "__main__": - giambio.run(main, debugger=Debugger()) diff --git a/tests/events.py b/tests/events.py index f5b0e95..cb63d67 100644 --- a/tests/events.py +++ b/tests/events.py @@ -32,4 +32,4 @@ async def parent(pause: int = 1): if __name__ == "__main__": - giambio.run(parent, 3) \ No newline at end of file + giambio.run(parent, 3) diff --git a/tests/nested_pool.py b/tests/nested_pool.py new file mode 100644 index 0000000..0845438 --- /dev/null +++ b/tests/nested_pool.py @@ -0,0 +1,26 @@ +import giambio +from debugger import Debugger + + +async def child(name: int): + print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") + await giambio.sleep(name) + print(f"[child {name}] Had a nice nap!") + + +async def main(): + start = giambio.clock() + async with giambio.create_pool() as pool: + pool.spawn(child, 1) + pool.spawn(child, 2) + async with giambio.create_pool() as a_pool: + a_pool.spawn(child, 3) + a_pool.spawn(child, 4) + print("[main] Children spawned, awaiting completion") + # This will *only* execute when everything inside the async with block + # has ran, including any other pool + print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(main, debugger=()) diff --git a/tests/server.py b/tests/server.py index aee89f0..91a58b8 100644 --- a/tests/server.py +++ b/tests/server.py @@ -67,3 +67,4 @@ if __name__ == "__main__": logging.info("Ctrl+C detected, exiting") else: logging.error(f"Exiting due to a {type(error).__name__}: {error}") + raise diff --git a/tests/timeout.py b/tests/timeout.py index ca646e8..e7e6267 100644 --- a/tests/timeout.py +++ b/tests/timeout.py @@ -2,16 +2,10 @@ import giambio from debugger import Debugger -async def child(): - print("[child] Child spawned!! Sleeping for 5 seconds") - await giambio.sleep(5) - print("[child] Had a nice nap!") - - -async def child1(): - print("[child 1] Child spawned!! Sleeping for 10 seconds") - await giambio.sleep(10) - print("[child 1] Had a nice nap!") +async def child(name: int): + print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") + await giambio.sleep(name) + print(f"[child {name}] Had a nice nap!") async def main(): @@ -20,9 +14,9 @@ async def main(): async with giambio.with_timeout(6) as pool: # TODO: We need to consider the inner part of # the with block as an implicit task, otherwise - # timeouts and cancellations won't work properly! - pool.spawn(child) # This will complete - pool.spawn(child1) # This will not + # timeouts and cancellations won't work with await fn()! + pool.spawn(child, 5) # This will complete + pool.spawn(child, 10) # This will not print("[main] Children spawned, awaiting completion") except giambio.exceptions.TooSlowError: print("[main] One or more children have timed out!")