Formatted code following black style

This commit is contained in:
nocturn9x 2020-07-13 22:04:06 +02:00
parent ee8b83a370
commit 7d3b5da90f
13 changed files with 100 additions and 51 deletions

View File

@ -23,5 +23,11 @@ from ._layers import Event
from ._managers import TaskManager
__all__ = ["AsyncScheduler", "GiambioError", "CancelledError", "sleep", "Event", "TaskManager"]
__all__ = [
"AsyncScheduler",
"GiambioError",
"CancelledError",
"sleep",
"Event",
"TaskManager",
]

View File

@ -45,10 +45,14 @@ class AsyncScheduler:
self.tasks = deque() # Tasks that are ready to run
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.current_task = None # This will always point to the currently running coroutine (Task object)
self.joined = defaultdict(list) # Maps child tasks that need to be joined their respective parent task(s)
self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably
self.joined = defaultdict(
list
) # Maps child tasks that need to be joined their respective parent task(s)
self.clock = (
default_timer # Monotonic clock to keep track of elapsed time reliably
)
self.paused = TimeQueue(self.clock) # Tasks that are asleep
self.events = {} # All Event objects
self.events = {} # All Event objects
self.event_waiting = defaultdict(list) # Coroutines waiting on event objects
self.sequence = 0
@ -59,23 +63,31 @@ class AsyncScheduler:
give execution control to the loop itself."""
while True:
if not self.selector.get_map() and not any([self.paused, self.tasks, self.event_waiting]): # If there is nothing to do, just exit
if not self.selector.get_map() and not any(
[self.paused, self.tasks, self.event_waiting]
): # If there is nothing to do, just exit
break
if not self.tasks:
if self.paused: # If there are no actively running tasks, we try to schedule the asleep ones
if (
self.paused
): # If there are no actively running tasks, we try to schedule the asleep ones
try:
self.check_sleeping()
except Exception as error:
self.current_task.exc = error
self.reschedule_parent(self.current_task)
if self.selector.get_map(): # This schedules task that are ready to perform I/O
if (
self.selector.get_map()
): # This schedules task that are ready to perform I/O
try:
self.check_io()
except Exception as error:
self.current_task.exc = error
self.reschedule_parent(self.current_task)
while self.tasks: # While there are tasks to run
self.current_task = self.tasks.popleft() # Sets the currently running task
while self.tasks: # While there are tasks to run
self.current_task = (
self.tasks.popleft()
) # Sets the currently running task
if self.current_task.cancelled:
continue
try:
@ -83,16 +95,22 @@ class AsyncScheduler:
self.current_task.cancelled = True
self.current_task.throw(CancelledError(self.current_task))
else:
method, *args = self.current_task.run(self.current_task._notify) # Run a single step with the calculation (and awake event-waiting tasks if any)
method, *args = self.current_task.run(
self.current_task._notify
) # Run a single step with the calculation (and awake event-waiting tasks if any)
self.current_task.status = "run"
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
if self.event_waiting: # Schedules tasks that are waiting on events
getattr(self, method)(
*args
) # Sneaky method call, thanks to David Beazley for this ;)
if (
self.event_waiting
): # Schedules tasks that are waiting on events
self.check_events()
except CancelledError as cancelled:
if cancelled.args[0] in self.tasks:
self.tasks.remove(cancelled.args[0]) # Remove the dead task
self.tasks.remove(cancelled.args[0]) # Remove the dead task
self.tasks.append(self.current_task)
except StopIteration as e: # Coroutine ends
except StopIteration as e: # Coroutine ends
self.current_task.result = e.args[0] if e.args else None
self.current_task.finished = True
self.reschedule_parent(self.current_task)
@ -114,8 +132,12 @@ class AsyncScheduler:
def check_sleeping(self):
"""Checks and reschedules sleeping tasks"""
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed
wait(
max(0.0, self.paused[0][0] - self.clock())
) # Sleep until the closest deadline in order not to waste CPU cycles
while (
self.paused[0][0] < self.clock()
): # Reschedules tasks when their deadline has elapsed
self.tasks.append(self.paused.get())
if not self.paused:
break
@ -123,8 +145,12 @@ class AsyncScheduler:
def check_io(self):
"""Checks and schedules task to perform I/O"""
timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely
io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks
timeout = (
0.0 if self.tasks else None
) # If there are no tasks ready wait indefinitely
io_ready = self.selector.select(
timeout
) # Get sockets that are ready and schedule their tasks
for key, _ in io_ready:
self.tasks.append(key.data) # Socket ready? Schedule the task
@ -165,7 +191,7 @@ class AsyncScheduler:
self.current_task.status = "I/O"
if self.current_task._last_io:
if self.current_task._last_io == ("READ", sock):
return # Socket is already scheduled!
return # Socket is already scheduled!
else:
self.selector.unregister(sock)
busy = False
@ -183,7 +209,7 @@ class AsyncScheduler:
self.current_task.status = "I/O"
if self.current_task._last_io:
if self.current_task._last_io == ("WRITE", sock):
return # Socket is already scheduled!
return # Socket is already scheduled!
else:
self.selector.unregister(sock) # modify() causes issues
busy = False
@ -246,11 +272,14 @@ class AsyncScheduler:
if task.cancelled or task.finished:
self.tasks.append(self.current_task)
elif task.status in ("sleep", "I/O"): # It is safe to cancel a task while blocking
elif task.status in (
"sleep",
"I/O",
): # It is safe to cancel a task while blocking
task.cancelled = True
task.throw(CancelledError(task))
else:
task.status = "cancel" # Cancellation is deferred
task.status = "cancel" # Cancellation is deferred
def wrap_socket(self, sock):
"""Wraps a standard socket into an AsyncSocket object"""
@ -296,4 +325,4 @@ class AsyncScheduler:
await want_write(sock)
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err != 0:
raise OSError(err, f'Connect call failed: {addr}')
raise OSError(err, f"Connect call failed: {addr}")

View File

@ -26,7 +26,7 @@ class Task:
def __init__(self, coroutine: types.coroutine):
self.coroutine = coroutine
self.cancelled = False # True if the task gets cancelled
self.cancelled = False # True if the task gets cancelled
self.exc = None
self.result = None
self.finished = False
@ -121,5 +121,3 @@ class TimeQueue:
def get(self):
return heappop(self.container)[2]

View File

@ -21,11 +21,15 @@ class TaskManager(object):
"""Cancels all tasks and raises an exception"""
exc = ErrorStack()
for task in itertools.chain(self.scheduler.tasks.copy(), self.scheduler.paused.items(), *self.scheduler.event_waiting.values()):
try:
await task.cancel()
except Exception as err:
exc.errors.append(err)
for task in itertools.chain(
self.scheduler.tasks.copy(),
self.scheduler.paused.items(),
*self.scheduler.event_waiting.values()
):
try:
await task.cancel()
except Exception as err:
exc.errors.append(err)
if exc.errors:
exc.errors.insert(err, 0)
raise exc
@ -35,7 +39,9 @@ class TaskManager(object):
"""Implements async with self"""
while True:
tasks = itertools.chain(self.scheduler.tasks.copy(), self.scheduler.paused.items())
tasks = itertools.chain(
self.scheduler.tasks.copy(), self.scheduler.paused.items()
)
for task in tasks:
try:
self.values[task] = await task.join()

View File

@ -29,4 +29,6 @@ def run(coro: coroutine):
token += thread.name
token += str(thread.native_id)
token += str(thread.ident)
token = sha256(token.encode()).hexdigest() # Unique token specific to a given thread at a given time
token = sha256(
token.encode()
).hexdigest() # Unique token specific to a given thread at a given time

View File

@ -55,7 +55,9 @@ def join(task):
if task.exc:
raise task.exc
elif not task.finished and not task.cancelled:
raise GiambioCriticalError(f"Task {task} did not terminate properly, the event loop might be in an inconsistent state!")
raise GiambioCriticalError(
f"Task {task} did not terminate properly, the event loop might be in an inconsistent state!"
)
return task.result

View File

@ -19,6 +19,7 @@ from traceback import format_exception
class GiambioError(Exception):
"""Base class for giambio exceptions"""
pass
@ -33,11 +34,13 @@ class CancelledError(GiambioCriticalError):
class ResourceBusy(GiambioError):
"""Exception that is raised when a resource is accessed by more than
one task at a time"""
pass
class BrokenPipeError(GiambioError):
"""Wrapper around the broken pipe socket.error"""
pass
@ -56,8 +59,10 @@ class ErrorStack(GiambioError):
def __str__(self):
"""Taken from anyio"""
tb_list = ['\n'.join(format_exception(type(err), err, err.__traceback__))
for err in self.errors]
tb_list = [
"\n".join(format_exception(type(err), err, err.__traceback__))
for err in self.errors
]
return f"{len(self.errors)} errors occurred, details below\n{self.SEP}{self.SEP.join(tb_list)}"
def __repr__(self):

View File

@ -21,8 +21,10 @@ limitations under the License.
import socket
from .exceptions import ResourceClosed
from ._traps import sleep
try:
from ssl import SSLWantReadError, SSLWantWriteError
WantRead = (BlockingIOError, InterruptedError, SSLWantReadError)
WantWrite = (BlockingIOError, InterruptedError, SSLWantWriteError)
except ImportError:
@ -67,7 +69,7 @@ class AsyncSocket(object):
if self._closed:
raise ResourceClosed("I/O operation on closed socket")
await sleep(0) # Give the scheduler the time to unregister the socket first
await sleep(0) # Give the scheduler the time to unregister the socket first
await self.loop.close_sock(self.sock)
self._closed = True
@ -86,4 +88,3 @@ class AsyncSocket(object):
def __repr__(self):
return f"giambio.socket.AsyncSocket({self.sock}, {self.loop})"

View File

@ -16,7 +16,7 @@ setuptools.setup(
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache License 2.0"
"License :: OSI Approved :: Apache License 2.0",
],
python_requires='>=3.6'
python_requires=">=3.6",
)

View File

@ -9,6 +9,7 @@ async def countdown(n: int):
print("Countdown over")
return 0
async def countup(stop: int, step: int = 1):
x = 0
while x < stop:
@ -32,7 +33,7 @@ async def main():
print(f"Countup returned: {up}\nCountdown returned: {down}")
print("Task execution complete")
if __name__ == "__main__":
scheduler = giambio.AsyncScheduler()
scheduler.start(main())

View File

@ -9,6 +9,7 @@ async def countdown(n: int):
print("Countdown over")
return 0
async def countup(stop: int, step: int = 1):
x = 0
while x < stop:
@ -30,7 +31,7 @@ async def main():
print("Countup cancelled")
print("Task execution complete")
if __name__ == "__main__":
scheduler = giambio.AsyncScheduler()
scheduler.start(main())

View File

@ -38,4 +38,3 @@ async def main():
if __name__ == "__main__":
scheduler = giambio.AsyncScheduler()
scheduler.start(main())

View File

@ -5,9 +5,9 @@ import logging
sched = giambio.AsyncScheduler()
logging.basicConfig(level=20,
format="[%(levelname)s] %(asctime)s %(message)s",
datefmt='%d/%m/%Y %p')
logging.basicConfig(
level=20, format="[%(levelname)s] %(asctime)s %(message)s", datefmt="%d/%m/%Y %p"
)
async def server(address: tuple):
@ -31,7 +31,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if not data:
break
to_send_back = data
data = data.decode("utf-8").encode('unicode_escape')
data = data.decode("utf-8").encode("unicode_escape")
logging.info(f"Got: '{data.decode('utf-8')}' from {addr}")
await sock.send_all(b"Got: " + to_send_back)
logging.info(f"Echoed back '{data.decode('utf-8')}' to {addr}")
@ -40,8 +40,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if __name__ == "__main__":
try:
sched.start(server(('', 1025)))
sched.start(server(("", 1025)))
print("Event loop started")
except KeyboardInterrupt: # Exceptions propagate!
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")