Added various exceptions layers

This commit is contained in:
nocturn9x 2020-07-05 06:58:23 +00:00
parent 5c5beeef22
commit fbee6c6f96
10 changed files with 198 additions and 46 deletions

14
LICENSE
View File

@ -185,17 +185,3 @@
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,3 +1,19 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
__author__ = "Nocturn9x aka Isgiambyy"
__version__ = (0, 0, 1)
from ._core import AsyncScheduler

View File

@ -1,10 +1,26 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Import libraries and internal resources
import types
from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
from heapq import heappush, heappop
import socket
from .exceptions import AlreadyJoinedError, CancelledError
from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy
from timeit import default_timer
from time import sleep as wait
from .socket import AsyncSocket, WantWrite
@ -27,7 +43,7 @@ class AsyncScheduler:
def __init__(self):
"""Object constructor"""
self.to_run = deque() # Tasks that are ready to run
self.tasks = deque() # Tasks that are ready to run
self.paused = [] # Tasks that are asleep
self.selector = DefaultSelector() # Selector object to perform I/O multiplexing
self.current_task = None # This will always point to the currently running coroutine (Task object)
@ -42,44 +58,48 @@ class AsyncScheduler:
give execution control to the loop itself."""
while True:
if not self.selector.get_map() and not any([self.paused, self.to_run]): # If there is nothing to do, just exit
if not self.selector.get_map() and not any([self.paused, self.tasks]): # If there is nothing to do, just exit
break
if not self.to_run and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones
if not self.tasks and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones
wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles
while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed
_, __, task = heappop(self.paused)
self.to_run.append(task)
self.tasks.append(task)
if not self.paused:
break
timeout = 0.0 if self.to_run else None # If there are no tasks ready wait indefinitely
timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely
tasks = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks
for key, _ in tasks:
self.to_run.append(key.data) # Socket ready? Schedule the task
self.tasks.append(key.data) # Socket ready? Schedule the task
self.selector.unregister(
key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now)
while self.to_run: # While there are tasks to run
self.current_task = self.to_run.popleft() # Sets the currently running task
while self.tasks: # While there are tasks to run
self.current_task = self.tasks.popleft() # Sets the currently running task
try:
method, *args = self.current_task.run() # Run a single step with the calculation
getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;)
except CancelledError as cancelled: # Coroutine was cancelled
task = cancelled.args[0]
task.cancelled = True
self.reschedule_parent()
self.tasks.append(self.current_task)
except RuntimeError:
self.reschedule_parent()
except StopIteration as e: # Coroutine ends
self.current_task.result = e.args[0] if e.args else None
self.current_task.finished = True
self.reschedule_parent()
except CancelledError: # Coroutine was cancelled
self.current_task.cancelled = True
self.reschedule_parent()
except Exception as error: # Coroutine raised
self.current_task.exc = error
self.reschedule_parent()
raise # Find a better way to propagate errors
raise # Maybe find a better way to propagate errors?
def create_task(self, coro: types.coroutine):
"""Spawns a child task"""
task = Task(coro)
self.to_run.append(task)
self.tasks.append(task)
return task
def start(self, coro: types.coroutine):
@ -94,17 +114,30 @@ class AsyncScheduler:
popped = self.joined.pop(self.current_task, None)
if popped:
self.to_run.append(popped)
self.tasks.append(popped)
def want_read(self, sock: socket.socket):
"""Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing"""
self.selector.register(sock, EVENT_READ, self.current_task)
busy = False
try:
self.selector.register(sock, EVENT_READ, self.current_task)
except KeyError:
busy = True
if busy:
raise ResourceBusy("The given resource is busy!")
def want_write(self, sock: socket.socket):
"""Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing"""
self.selector.register(sock, EVENT_WRITE, self.current_task)
busy = False
try:
self.selector.register(sock, EVENT_WRITE, self.current_task)
except KeyError:
busy = True
if busy:
raise ResourceBusy("The given resource is busy!")
def join(self, coro: types.coroutine):
"""Handler for the 'join' event, does some magic to tell the scheduler
@ -128,7 +161,7 @@ class AsyncScheduler:
in order to stop it from executing. The loop continues to execute as tasks
are independent"""
task.throw(CancelledError)
task.throw(CancelledError(task))
def wrap_socket(self, sock):
"""Wraps a standard socket into an AsyncSocket object"""

View File

@ -1,3 +1,19 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import types
from ._traps import join, cancel

View File

@ -1,3 +1,19 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from ._core import AsyncScheduler
from types import coroutine

View File

@ -1,3 +1,19 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""Helper methods to interact with the event loop"""
import types

View File

@ -1,3 +1,19 @@
"""
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class GiambioError(Exception):
"""Base class for gaimbio exceptions"""
pass
@ -12,3 +28,19 @@ class CancelledError(BaseException):
def __repr__(self):
return "giambio.exceptions.CancelledError"
class ResourceBusy(GiambioError):
"""Exception that is raised when a resource is accessed by more than
one task at a time"""
pass
class BrokenPipeError(GiambioError):
"""Wrapper around the broken pipe socket.error"""
pass
class ResourceClosed(GiambioError):
"""Raised when I/O is attempted on a closed fd"""
pass

View File

@ -2,10 +2,24 @@
Basic abstraction layer for giambio asynchronous sockets
Copyright (C) 2020 nocturn9x
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import socket
from .exceptions import ResourceClosed
try:
from ssl import SSLWantReadError, SSLWantWriteError
WantRead = (BlockingIOError, InterruptedError, SSLWantReadError)
@ -26,18 +40,36 @@ class AsyncSocket(object):
async def receive(self, max_size: int):
"""Receives up to max_size from a socket asynchronously"""
return await self.loop.read_sock(self.sock, max_size)
closed = False
try:
return await self.loop.read_sock(self.sock, max_size)
except OSError:
closed = True
if closed:
raise ResourceClosed("I/O operation on closed socket")
async def accept(self):
"""Accepts the socket, completing the 3-step TCP handshake asynchronously"""
to_wrap = await self.loop.accept_sock(self.sock)
return self.loop.wrap_socket(to_wrap[0]), to_wrap[1]
closed = False
try:
to_wrap = await self.loop.accept_sock(self.sock)
return self.loop.wrap_socket(to_wrap[0]), to_wrap[1]
except OSError:
closed = True
if closed:
raise ResourceClosed("I/O operation on closed socket")
async def send_all(self, data: bytes):
"""Sends all data inside the buffer asynchronously until it is empty"""
return await self.loop.sock_sendall(self.sock, data)
closed = False
try:
return await self.loop.sock_sendall(self.sock, data)
except OSError:
closed = True
if closed:
raise ResourceClosed("I/O operation on closed socket")
async def close(self):
"""Closes the socket asynchronously"""
@ -47,7 +79,13 @@ class AsyncSocket(object):
async def connect(self, addr: tuple):
"""Connects the socket to an endpoint"""
await self.loop.connect_sock(self.sock, addr)
closed = False
try:
await self.loop.connect_sock(self.sock, addr)
except OSError:
closed = True
if closed:
raise ResourceClosed("I/O operation on closed socket")
def __enter__(self):
return self.sock.__enter__()

View File

@ -6,8 +6,6 @@ async def countdown(n: int):
print(f"Down {n}")
n -= 1
await sleep(1)
if n == 5:
raise ValueError('lul')
print("Countdown over")
@ -21,17 +19,17 @@ async def countup(stop, step: int or float = 1):
async def main():
cup = scheduler.create_task(countdown(10))
cdown = scheduler.create_task(countup(5, 2))
cdown = scheduler.create_task(countdown(10))
cup = scheduler.create_task(countup(5, 2))
print("Counters started, awaiting completion")
await sleep(2)
print("Slept 1 second, killing countdown")
await cdown.cancel()
await cup.join()
await cdown.join()
print("Task execution complete")
if __name__ == "__main__":
scheduler = AsyncScheduler()
try:
scheduler.start(main())
except Exception:
print("main() errored!")
scheduler.start(main())

View File

@ -40,6 +40,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple):
if __name__ == "__main__":
try:
sched.start(server(('', 25000)))
sched.start(server(('', 25001)))
except KeyboardInterrupt: # Exceptions propagate!
print("Exiting...")