diff --git a/LICENSE b/LICENSE index 261eeb9..3c4dfe0 100644 --- a/LICENSE +++ b/LICENSE @@ -185,17 +185,3 @@ file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/giambio/__init__.py b/giambio/__init__.py index 40da333..36f63c0 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,3 +1,19 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + __author__ = "Nocturn9x aka Isgiambyy" __version__ = (0, 0, 1) from ._core import AsyncScheduler diff --git a/giambio/_core.py b/giambio/_core.py index e1f4835..685909d 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -1,10 +1,26 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + # Import libraries and internal resources import types from collections import deque from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from heapq import heappush, heappop import socket -from .exceptions import AlreadyJoinedError, CancelledError +from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy from timeit import default_timer from time import sleep as wait from .socket import AsyncSocket, WantWrite @@ -27,7 +43,7 @@ class AsyncScheduler: def __init__(self): """Object constructor""" - self.to_run = deque() # Tasks that are ready to run + self.tasks = deque() # Tasks that are ready to run self.paused = [] # Tasks that are asleep self.selector = DefaultSelector() # Selector object to perform I/O multiplexing self.current_task = None # This will always point to the currently running coroutine (Task object) @@ -42,44 +58,48 @@ class AsyncScheduler: give execution control to the loop itself.""" while True: - if not self.selector.get_map() and not any([self.paused, self.to_run]): # If there is nothing to do, just exit + if not self.selector.get_map() and not any([self.paused, self.tasks]): # If there is nothing to do, just exit break - if not self.to_run and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones + if not self.tasks and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed _, __, task = heappop(self.paused) - self.to_run.append(task) + self.tasks.append(task) if not self.paused: break - timeout = 0.0 if self.to_run else None # If there are no tasks ready wait indefinitely + timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely tasks = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks for key, _ in tasks: - self.to_run.append(key.data) # Socket ready? Schedule the task + self.tasks.append(key.data) # Socket ready? Schedule the task self.selector.unregister( key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) - while self.to_run: # While there are tasks to run - self.current_task = self.to_run.popleft() # Sets the currently running task + while self.tasks: # While there are tasks to run + self.current_task = self.tasks.popleft() # Sets the currently running task try: method, *args = self.current_task.run() # Run a single step with the calculation getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) + except CancelledError as cancelled: # Coroutine was cancelled + task = cancelled.args[0] + task.cancelled = True + self.reschedule_parent() + self.tasks.append(self.current_task) + except RuntimeError: + self.reschedule_parent() except StopIteration as e: # Coroutine ends self.current_task.result = e.args[0] if e.args else None self.current_task.finished = True self.reschedule_parent() - except CancelledError: # Coroutine was cancelled - self.current_task.cancelled = True - self.reschedule_parent() except Exception as error: # Coroutine raised self.current_task.exc = error self.reschedule_parent() - raise # Find a better way to propagate errors + raise # Maybe find a better way to propagate errors? def create_task(self, coro: types.coroutine): """Spawns a child task""" task = Task(coro) - self.to_run.append(task) + self.tasks.append(task) return task def start(self, coro: types.coroutine): @@ -94,17 +114,30 @@ class AsyncScheduler: popped = self.joined.pop(self.current_task, None) if popped: - self.to_run.append(popped) + self.tasks.append(popped) def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" - self.selector.register(sock, EVENT_READ, self.current_task) + busy = False + try: + self.selector.register(sock, EVENT_READ, self.current_task) + except KeyError: + busy = True + if busy: + raise ResourceBusy("The given resource is busy!") def want_write(self, sock: socket.socket): """Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing""" - self.selector.register(sock, EVENT_WRITE, self.current_task) + + busy = False + try: + self.selector.register(sock, EVENT_WRITE, self.current_task) + except KeyError: + busy = True + if busy: + raise ResourceBusy("The given resource is busy!") def join(self, coro: types.coroutine): """Handler for the 'join' event, does some magic to tell the scheduler @@ -128,7 +161,7 @@ class AsyncScheduler: in order to stop it from executing. The loop continues to execute as tasks are independent""" - task.throw(CancelledError) + task.throw(CancelledError(task)) def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" diff --git a/giambio/_layers.py b/giambio/_layers.py index 8ee0202..d693274 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -1,3 +1,19 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + import types from ._traps import join, cancel diff --git a/giambio/_run.py b/giambio/_run.py index 285ca1c..2b62fb6 100644 --- a/giambio/_run.py +++ b/giambio/_run.py @@ -1,3 +1,19 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + from ._core import AsyncScheduler from types import coroutine diff --git a/giambio/_traps.py b/giambio/_traps.py index bed64ad..ceef6a9 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -1,3 +1,19 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + """Helper methods to interact with the event loop""" import types diff --git a/giambio/exceptions.py b/giambio/exceptions.py index dd6ca5c..8f22566 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -1,3 +1,19 @@ +""" + Copyright (C) 2020 nocturn9x + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + class GiambioError(Exception): """Base class for gaimbio exceptions""" pass @@ -12,3 +28,19 @@ class CancelledError(BaseException): def __repr__(self): return "giambio.exceptions.CancelledError" + + +class ResourceBusy(GiambioError): + """Exception that is raised when a resource is accessed by more than + one task at a time""" + pass + + +class BrokenPipeError(GiambioError): + """Wrapper around the broken pipe socket.error""" + pass + + +class ResourceClosed(GiambioError): + """Raised when I/O is attempted on a closed fd""" + pass diff --git a/giambio/socket.py b/giambio/socket.py index f98068b..efdeac3 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -2,10 +2,24 @@ Basic abstraction layer for giambio asynchronous sockets +Copyright (C) 2020 nocturn9x + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ import socket +from .exceptions import ResourceClosed try: from ssl import SSLWantReadError, SSLWantWriteError WantRead = (BlockingIOError, InterruptedError, SSLWantReadError) @@ -26,18 +40,36 @@ class AsyncSocket(object): async def receive(self, max_size: int): """Receives up to max_size from a socket asynchronously""" - return await self.loop.read_sock(self.sock, max_size) + closed = False + try: + return await self.loop.read_sock(self.sock, max_size) + except OSError: + closed = True + if closed: + raise ResourceClosed("I/O operation on closed socket") async def accept(self): """Accepts the socket, completing the 3-step TCP handshake asynchronously""" - to_wrap = await self.loop.accept_sock(self.sock) - return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] + closed = False + try: + to_wrap = await self.loop.accept_sock(self.sock) + return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] + except OSError: + closed = True + if closed: + raise ResourceClosed("I/O operation on closed socket") async def send_all(self, data: bytes): """Sends all data inside the buffer asynchronously until it is empty""" - return await self.loop.sock_sendall(self.sock, data) + closed = False + try: + return await self.loop.sock_sendall(self.sock, data) + except OSError: + closed = True + if closed: + raise ResourceClosed("I/O operation on closed socket") async def close(self): """Closes the socket asynchronously""" @@ -47,7 +79,13 @@ class AsyncSocket(object): async def connect(self, addr: tuple): """Connects the socket to an endpoint""" - await self.loop.connect_sock(self.sock, addr) + closed = False + try: + await self.loop.connect_sock(self.sock, addr) + except OSError: + closed = True + if closed: + raise ResourceClosed("I/O operation on closed socket") def __enter__(self): return self.sock.__enter__() diff --git a/tests/count.py b/tests/count.py index ec8aa53..451a967 100644 --- a/tests/count.py +++ b/tests/count.py @@ -6,8 +6,6 @@ async def countdown(n: int): print(f"Down {n}") n -= 1 await sleep(1) - if n == 5: - raise ValueError('lul') print("Countdown over") @@ -21,17 +19,17 @@ async def countup(stop, step: int or float = 1): async def main(): - cup = scheduler.create_task(countdown(10)) - cdown = scheduler.create_task(countup(5, 2)) + cdown = scheduler.create_task(countdown(10)) + cup = scheduler.create_task(countup(5, 2)) print("Counters started, awaiting completion") + await sleep(2) + print("Slept 1 second, killing countdown") + await cdown.cancel() await cup.join() await cdown.join() print("Task execution complete") if __name__ == "__main__": scheduler = AsyncScheduler() - try: - scheduler.start(main()) - except Exception: - print("main() errored!") + scheduler.start(main()) diff --git a/tests/server.py b/tests/server.py index ee2dcb5..6a027e1 100644 --- a/tests/server.py +++ b/tests/server.py @@ -40,6 +40,7 @@ async def echo_handler(sock: AsyncSocket, addr: tuple): if __name__ == "__main__": try: - sched.start(server(('', 25000))) + sched.start(server(('', 25001))) except KeyboardInterrupt: # Exceptions propagate! print("Exiting...") +