diff --git a/giambio/__init__.py b/giambio/__init__.py index c5a997e..529da7e 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -21,5 +21,11 @@ from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep from ._layers import Event -__all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "sleep", "Event"] - +__all__ = [ + "AsyncScheduler", + "GiambioError", + "AlreadyJoinedError", + "CancelledError", + "sleep", + "Event", +] diff --git a/giambio/_core.py b/giambio/_core.py index 01f11c2..8c628a4 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -45,10 +45,14 @@ class AsyncScheduler: self.tasks = deque() # Tasks that are ready to run self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.current_task = None # This will always point to the currently running coroutine (Task object) - self.joined = {} # Maps child tasks that need to be joined their respective parent task - self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably + self.joined = ( + {} + ) # Maps child tasks that need to be joined their respective parent task + self.clock = ( + default_timer # Monotonic clock to keep track of elapsed time reliably + ) self.paused = TimeQueue(self.clock) # Tasks that are asleep - self.events = {} # All Event objects + self.events = {} # All Event objects self.event_waiting = defaultdict(list) # Coroutines waiting on event objects self.sequence = 0 @@ -59,10 +63,14 @@ class AsyncScheduler: give execution control to the loop itself.""" while True: - if not self.selector.get_map() and not any([self.paused, self.tasks, self.event_waiting]): # If there is nothing to do, just exit + if not self.selector.get_map() and not any( + [self.paused, self.tasks, self.event_waiting] + ): # If there is nothing to do, just exit break if not self.tasks: - if self.paused: # If there are no actively running tasks, we try to schedule the asleep ones + if ( + self.paused + ): # If there are no actively running tasks, we try to schedule the asleep ones try: self.check_sleeping() except BaseException as error: @@ -74,25 +82,31 @@ class AsyncScheduler: except BaseException as error: self.current_task.exc = error self.reschedule_parent(self.current_task) - while self.tasks: # While there are tasks to run - self.current_task = self.tasks.popleft() # Sets the currently running task + while self.tasks: # While there are tasks to run + self.current_task = ( + self.tasks.popleft() + ) # Sets the currently running task try: if self.current_task.status == "cancel": # Deferred cancellation self.current_task.cancelled = True self.current_task.throw(CancelledError(self.current_task)) - method, *args = self.current_task.run(self.current_task._notify) # Run a single step with the calculation (and awake event-waiting tasks if any) + method, *args = self.current_task.run( + self.current_task._notify + ) # Run a single step with the calculation (and awake event-waiting tasks if any) self.current_task.status = "run" - getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) + getattr(self, method)( + *args + ) # Sneaky method call, thanks to David Beazley for this ;) if self.event_waiting: self.check_events() except CancelledError as cancelled: - self.tasks.remove(cancelled.args[0]) # Remove the dead task + self.tasks.remove(cancelled.args[0]) # Remove the dead task self.tasks.append(self.current_task) - except StopIteration as e: # Coroutine ends + except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None self.current_task.finished = True self.reschedule_parent(self.current_task) - except BaseException as error: # Coroutine raised + except BaseException as error: # Coroutine raised self.current_task.exc = error self.reschedule_parent(self.current_task) @@ -110,8 +124,12 @@ class AsyncScheduler: def check_sleeping(self): """Checks and reschedules sleeping tasks""" - wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles - while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed + wait( + max(0.0, self.paused[0][0] - self.clock()) + ) # Sleep until the closest deadline in order not to waste CPU cycles + while ( + self.paused[0][0] < self.clock() + ): # Reschedules tasks when their deadline has elapsed self.tasks.append(self.paused.get()) if not self.paused: break @@ -119,8 +137,12 @@ class AsyncScheduler: def check_io(self): """Checks and schedules task to perform I/O""" - timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely - io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks + timeout = ( + 0.0 if self.tasks else None + ) # If there are no tasks ready wait indefinitely + io_ready = self.selector.select( + timeout + ) # Get sockets that are ready and schedule their tasks for key, _ in io_ready: self.tasks.append(key.data) # Socket ready? Schedule the task @@ -167,7 +189,7 @@ class AsyncScheduler: self.current_task.status = "I/O" if self.current_task._last_io: if self.current_task._last_io == ("READ", sock): - return # Socket is already scheduled! + return # Socket is already scheduled! else: self.selector.unregister(sock) busy = False @@ -185,7 +207,7 @@ class AsyncScheduler: self.current_task.status = "I/O" if self.current_task._last_io: if self.current_task._last_io == ("WRITE", sock): - return # Socket is already scheduled! + return # Socket is already scheduled! else: self.selector.unregister(sock) # modify() causes issues busy = False @@ -205,7 +227,7 @@ class AsyncScheduler: if child.cancelled: # Task was cancelled and is therefore dead self.tasks.append(self.current_task) - elif child.exc: # Task raised an error, propagate it! + elif child.exc: # Task raised an error, propagate it! self.reschedule_parent(child) raise child.exc elif child.finished: @@ -214,7 +236,9 @@ class AsyncScheduler: if child not in self.joined: self.joined[child] = self.current_task else: - raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") + raise AlreadyJoinedError( + "Joining the same task multiple times is not allowed!" + ) def sleep(self, seconds: int or float): """Puts the caller to sleep for a given amount of seconds""" @@ -250,11 +274,13 @@ class AsyncScheduler: in order to stop it from executing. The loop continues to execute as tasks are independent""" - if task.status in ("sleep", "I/O") and not task.cancelled: # It is safe to cancel a task while blocking + if ( + task.status in ("sleep", "I/O") and not task.cancelled + ): # It is safe to cancel a task while blocking task.cancelled = True task.throw(CancelledError(task)) elif task.status == "run": - task.status = "cancel" # Cancellation is deferred + task.status = "cancel" # Cancellation is deferred def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" @@ -300,4 +326,4 @@ class AsyncScheduler: await want_write(sock) err = sock.getsockopt(SOL_SOCKET, SO_ERROR) if err != 0: - raise OSError(err, f'Connect call failed: {addr}') + raise OSError(err, f"Connect call failed: {addr}") diff --git a/giambio/_layers.py b/giambio/_layers.py index 2acbbcc..44f8350 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -26,7 +26,7 @@ class Task: def __init__(self, coroutine: types.coroutine): self.coroutine = coroutine - self.cancelled = False # True if the task gets cancelled + self.cancelled = False # True if the task gets cancelled self.exc = None self.result = None self.finished = False @@ -117,5 +117,3 @@ class TimeQueue: def get(self): return heappop(self.container)[2] - - diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 96274af..3bbb35c 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -14,8 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. """ + class GiambioError(Exception): """Base class for gaimbio exceptions""" + pass @@ -33,14 +35,17 @@ class CancelledError(BaseException): class ResourceBusy(GiambioError): """Exception that is raised when a resource is accessed by more than one task at a time""" + pass class BrokenPipeError(GiambioError): """Wrapper around the broken pipe socket.error""" + pass class ResourceClosed(GiambioError): """Raised when I/O is attempted on a closed fd""" + pass diff --git a/giambio/socket.py b/giambio/socket.py index 83b5628..9bbb31f 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -21,8 +21,10 @@ limitations under the License. import socket from .exceptions import ResourceClosed from ._traps import sleep + try: from ssl import SSLWantReadError, SSLWantWriteError + WantRead = (BlockingIOError, InterruptedError, SSLWantReadError) WantWrite = (BlockingIOError, InterruptedError, SSLWantWriteError) except ImportError: @@ -67,7 +69,7 @@ class AsyncSocket(object): if self._closed: raise ResourceClosed("I/O operation on closed socket") - await sleep(0) # Give the scheduler the time to unregister the socket first + await sleep(0) # Give the scheduler the time to unregister the socket first await self.loop.close_sock(self.sock) self._closed = True @@ -86,4 +88,3 @@ class AsyncSocket(object): def __repr__(self): return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})" - diff --git a/setup.py b/setup.py index ed20cd6..7837f34 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ setuptools.setup( classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", - "License :: OSI Approved :: Apache License 2.0" + "License :: OSI Approved :: Apache License 2.0", ], - python_requires='>=3.6' + python_requires=">=3.6", ) diff --git a/tests/count.py b/tests/count.py index 558eaea..583d336 100644 --- a/tests/count.py +++ b/tests/count.py @@ -9,6 +9,7 @@ async def countdown(n: int): print("Countdown over") return 0 + async def countup(stop: int, step: int = 1): x = 0 while x < stop: @@ -32,7 +33,7 @@ async def main(): print(f"Countup returned: {up}\nCountdown returned: {down}") print("Task execution complete") + if __name__ == "__main__": scheduler = giambio.AsyncScheduler() scheduler.start(main()) - diff --git a/tests/server.py b/tests/server.py index 6a027e1..869c9fa 100644 --- a/tests/server.py +++ b/tests/server.py @@ -5,9 +5,9 @@ import logging sched = giambio.AsyncScheduler() -logging.basicConfig(level=20, - format="[%(levelname)s] %(asctime)s %(message)s", - datefmt='%d/%m/%Y %p') +logging.basicConfig( + level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p" +) async def server(address: tuple): @@ -31,7 +31,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if not data: break to_send_back = data - data = data.decode("utf-8").encode('unicode_escape') + data = data.decode("utf-8").encode("unicode_escape") logging.info(f"Got: '{data.decode('utf-8')}' from {addr}") await sock.send_all(b"Got: " + to_send_back) logging.info(f"Echoed back '{data.decode('utf-8')}' to {addr}") @@ -40,7 +40,6 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": try: - sched.start(server(('', 25001))) - except KeyboardInterrupt: # Exceptions propagate! + sched.start(server(("", 25001))) + except KeyboardInterrupt: # Exceptions propagate! print("Exiting...") -