diff --git a/giambio/__init__.py b/giambio/__init__.py index 36f63c0..c5a997e 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -1,23 +1,25 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ __author__ = "Nocturn9x aka Isgiambyy" -__version__ = (0, 0, 1) +__version__ = (1, 0, 0) from ._core import AsyncScheduler from .exceptions import GiambioError, AlreadyJoinedError, CancelledError from ._traps import sleep +from ._layers import Event + +__all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "sleep", "Event"] -__all__ = ["AsyncScheduler", "GiambioError", "AlreadyJoinedError", "CancelledError", "TaskManager", "sleep"] diff --git a/giambio/_core.py b/giambio/_core.py index adfc227..b44b54e 100644 --- a/giambio/_core.py +++ b/giambio/_core.py @@ -23,7 +23,7 @@ import socket from .exceptions import AlreadyJoinedError, CancelledError, ResourceBusy from timeit import default_timer from time import sleep as wait -from .socket import AsyncSocket, WantWrite +from .socket import AsyncSocket, WantWrite, WantRead from ._layers import Task from socket import SOL_SOCKET, SO_ERROR from ._traps import want_read, want_write @@ -50,6 +50,8 @@ class AsyncScheduler: self.joined = {} # Maps child tasks that need to be joined their respective parent task self.clock = default_timer # Monotonic clock to keep track of elapsed time reliably self.sequence = 0 # A monotonically increasing ID to avoid some corner cases with deadlines comparison + self.events = {} # All Event objects + self.event_waiting = {} # Coroutines waiting on event objects def run(self): """Starts the loop and 'listens' for events until there are either ready or asleep tasks @@ -58,39 +60,69 @@ class AsyncScheduler: give execution control to the loop itself.""" while True: - if not self.selector.get_map() and not any([self.paused, self.tasks]): # If there is nothing to do, just exit + if not self.selector.get_map() and not any([self.paused, self.tasks, self.event_waiting]): # If there is nothing to do, just exit break - if not self.tasks and self.paused: # If there are no actively running tasks, we try to schedule the asleep ones - wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles - while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed - _, __, task = heappop(self.paused) - self.tasks.append(task) - if not self.paused: - break - timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely - io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks - for key, _ in io_ready: - self.tasks.append(key.data) # Socket ready? Schedule the task - self.selector.unregister( - key.fileobj) # Once (re)scheduled, the task does not need to perform I/O multiplexing (for now) + if not self.tasks: + if self.paused: # If there are no actively running tasks, we try to schedule the asleep ones + self.check_sleeping() + if self.selector.get_map(): + self.check_io() while self.tasks: # While there are tasks to run self.current_task = self.tasks.popleft() # Sets the currently running task - self.current_task.status = "run" - try: - method, *args = self.current_task.run() # Run a single step with the calculation - getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) - except CancelledError as cancelled: # Coroutine was cancelled - task = cancelled.args[0] - task.cancelled = True - self.tasks.remove(task) - except StopIteration as e: # Coroutine ends - self.current_task.result = e.args[0] if e.args else None - self.current_task.finished = True - self.reschedule_parent(self.current_task) - except BaseException as error: # Coroutine raised - self.current_task.exc = error - self.reschedule_parent(self.current_task) - raise # Maybe find a better way to propagate errors? + if self.current_task.status == "cancel": # Deferred cancellation + self.current_task.cancelled = True + self.current_task.throw(CancelledError) + else: + self.current_task.status = "run" + try: + method, *args = self.current_task.run(self.current_task._notify) # Run a single step with the calculation + getattr(self, method)(*args) # Sneaky method call, thanks to David Beazley for this ;) + if self.event_waiting: + self.check_events() + except CancelledError as cancelled: + self.tasks.remove(cancelled.args[0]) + except StopIteration as e: # Coroutine ends + self.current_task.result = e.args[0] if e.args else None + self.current_task.finished = True + self.reschedule_parent(self.current_task) + except BaseException as error: # Coroutine raised + self.current_task.exc = error + self.reschedule_parent(self.current_task) + raise # Maybe find a better way to propagate errors? + + def check_events(self): + """Checks for ready or expired events and triggers them""" + + for event, (timeout, _, task) in self.event_waiting.copy().items(): + if event._set: + task._notify = event._notify + self.tasks.append(task) + self.tasks.append(event.notifier) + self.event_waiting.pop(event) + elif timeout and self.clock() > timeout: + event._timeout_expired = True + event._notify = task._notify = None + self.tasks.append(task) + self.tasks.append(event.notifier) + self.event_waiting.pop(event) + + def check_sleeping(self): + """Checks and reschedules sleeping tasks""" + + wait(max(0.0, self.paused[0][0] - self.clock())) # Sleep until the closest deadline in order not to waste CPU cycles + while self.paused[0][0] < self.clock(): # Reschedules tasks when their deadline has elapsed + _, __, task = heappop(self.paused) + self.tasks.append(task) + if not self.paused: + break + + def check_io(self): + """Checks and schedules task to perform I/O""" + + timeout = 0.0 if self.tasks else None # If there are no tasks ready wait indefinitely + io_ready = self.selector.select(timeout) # Get sockets that are ready and schedule their tasks + for key, _ in io_ready: + self.tasks.append(key.data) # Socket ready? Schedule the task def create_task(self, coro: types.coroutine): """Spawns a child task""" @@ -109,15 +141,23 @@ class AsyncScheduler: def reschedule_parent(self, coro): """Reschedules the parent task""" - popped = self.joined.pop(coro, None) - if popped: - self.tasks.append(popped) - return popped + parent = self.joined.pop(coro, None) + if parent: + assert parent not in self.tasks + self.tasks.append(parent) + return parent def want_read(self, sock: socket.socket): """Handler for the 'want_read' event, registers the socket inside the selector to perform I/0 multiplexing""" + self.current_task.status = "I/O" + if self.current_task._last_io: + if self.current_task._last_io == ("READ", sock): + return # Socket is already scheduled! + else: + self.selector.unregister(sock) busy = False + self.current_task._last_io = "READ", sock try: self.selector.register(sock, EVENT_READ, self.current_task) except KeyError: @@ -128,7 +168,14 @@ class AsyncScheduler: def want_write(self, sock: socket.socket): """Handler for the 'want_write' event, registers the socket inside the selector to perform I/0 multiplexing""" + self.current_task.status = "I/O" + if self.current_task._last_io: + if self.current_task._last_io == ("WRITE", sock): + return # Socket is already scheduled! + else: + self.selector.unregister(sock) # modify() causes issues busy = False + self.current_task._last_io = "WRITE", sock try: self.selector.register(sock, EVENT_WRITE, self.current_task) except KeyError: @@ -142,25 +189,52 @@ class AsyncScheduler: coroutine returns or, if an exception gets raised, the exception will get propagated inside the parent task""" + if child.finished: + self.tasks.append(self.current_task) if child not in self.joined: self.joined[child] = self.current_task else: raise AlreadyJoinedError("Joining the same task multiple times is not allowed!") - def sleep(self, seconds): + def sleep(self, seconds: int or float): """Puts the caller to sleep for a given amount of seconds""" + if seconds: + self.sequence += 1 + self.current_task.status = "sleep" + heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task)) + + def event_set(self, event, value): + """Sets an event""" + + event.notifier = self.current_task + self.events[event] = value + + def event_wait(self, event, timeout): + """Waits for an event""" + self.sequence += 1 - self.current_task.status = "sleep" - heappush(self.paused, (self.clock() + seconds, self.sequence, self.current_task)) + if timeout: + timeout = self.clock() + timeout + else: + timeout = 0 + if self.events.get(event, None): + return self.events.pop(event) + else: + self.event_waiting[event] = timeout, self.sequence, self.current_task + self.event_waiting = dict(sorted(self.event_waiting.items())) def cancel(self, task): """Handler for the 'cancel' event, throws CancelledError inside a coroutine in order to stop it from executing. The loop continues to execute as tasks are independent""" - self.reschedule_parent(task) - task.throw(CancelledError(task)) + if task.status in ("sleep", "I/O") and not task.cancelled: # It is safe to cancel a task while blocking + task.cancelled = True + task.throw(CancelledError(task)) + elif task.status == "run": + task.status = "cancel" + self.reschedule_parent() def wrap_socket(self, sock): """Wraps a standard socket into an AsyncSocket object""" diff --git a/giambio/_layers.py b/giambio/_layers.py index ba908f7..2ea883e 100644 --- a/giambio/_layers.py +++ b/giambio/_layers.py @@ -1,22 +1,21 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ import types -from ._traps import join, cancel - +from ._traps import join, cancel, event_set, event_wait class Task: @@ -28,7 +27,9 @@ class Task: self.exc = None self.result = None self.finished = False - self.status = "init" + self.status = "init" # This is useful for cancellation + self._last_io = None + self._notify = None def run(self, what=None): """Simple abstraction layer over the coroutines ``send`` method""" @@ -54,3 +55,33 @@ class Task: """Implements repr(self)""" return f"Task({self.coroutine}, cancelled={self.cancelled}, exc={repr(self.exc)}, result={self.result}, finished={self.finished}, status={self.status})" + + +class Event: + """A class designed similarly to threading.Event, but with more features""" + + def __init__(self, loop): + """Object constructor""" + + self._set = False + self._notify = None + self.notifier = loop.current_task + self._timeout_expired = False + self.event_caught = False + self.timeout = None + + async def set(self, value=None): + """Sets the event, optionally taking a value. This can be used + to control tasks' flow by 'sending' commands back and fort""" + + self._set = True + self._notify = value + await event_set(self, value) + + async def pause(self, timeout=0): + """Waits until the event is set and returns a value""" + + msg = await event_wait(self, timeout) + if not self._timeout_expired: + self.event_caught = True + return msg diff --git a/giambio/_run.py b/giambio/_run.py index 2b62fb6..44c6fa1 100644 --- a/giambio/_run.py +++ b/giambio/_run.py @@ -1,17 +1,17 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ from ._core import AsyncScheduler diff --git a/giambio/_traps.py b/giambio/_traps.py index ceef6a9..4ed9b31 100644 --- a/giambio/_traps.py +++ b/giambio/_traps.py @@ -1,17 +1,17 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ """Helper methods to interact with the event loop""" @@ -63,7 +63,7 @@ def cancel(task): """ yield "cancel", task - assert task.cancelled + assert task.cancelled, f"Coroutine ignored CancelledError" @types.coroutine @@ -87,3 +87,15 @@ def want_write(sock: socket.socket): yield "want_write", sock + +@types.coroutine +def event_set(event, value): + + yield "event_set", event, value + + +@types.coroutine +def event_wait(event, timeout: int): + + msg = yield "event_wait", event, timeout + return msg diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 8f22566..96274af 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -1,17 +1,17 @@ """ - Copyright (C) 2020 nocturn9x +Copyright (C) 2020 nocturn9x - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. """ class GiambioError(Exception): diff --git a/giambio/socket.py b/giambio/socket.py index 7f7245d..83b5628 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -37,57 +37,46 @@ class AsyncSocket(object): self.sock = sock self.sock.setblocking(False) self.loop = loop + self._closed = False async def receive(self, max_size: int): """Receives up to max_size from a socket asynchronously""" - closed = False - try: - return await self.loop.read_sock(self.sock, max_size) - except OSError: - closed = True - if closed: + if self._closed: raise ResourceClosed("I/O operation on closed socket") + self.loop.current_task.status = "I/O" + return await self.loop.read_sock(self.sock, max_size) async def accept(self): """Accepts the socket, completing the 3-step TCP handshake asynchronously""" - closed = False - try: - to_wrap = await self.loop.accept_sock(self.sock) - return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] - except OSError: - closed = True - if closed: + if self._closed: raise ResourceClosed("I/O operation on closed socket") + to_wrap = await self.loop.accept_sock(self.sock) + return self.loop.wrap_socket(to_wrap[0]), to_wrap[1] async def send_all(self, data: bytes): """Sends all data inside the buffer asynchronously until it is empty""" - closed = False - try: - return await self.loop.sock_sendall(self.sock, data) - except OSError: - closed = True - if closed: + if self._closed: raise ResourceClosed("I/O operation on closed socket") + return await self.loop.sock_sendall(self.sock, data) async def close(self): """Closes the socket asynchronously""" + if self._closed: + raise ResourceClosed("I/O operation on closed socket") await sleep(0) # Give the scheduler the time to unregister the socket first await self.loop.close_sock(self.sock) + self._closed = True async def connect(self, addr: tuple): """Connects the socket to an endpoint""" - closed = False - try: - await self.loop.connect_sock(self.sock, addr) - except OSError: - closed = True - if closed: + if self._closed: raise ResourceClosed("I/O operation on closed socket") + await self.loop.connect_sock(self.sock, addr) def __enter__(self): return self.sock.__enter__() diff --git a/tests/events.py b/tests/events.py new file mode 100644 index 0000000..fdc59c6 --- /dev/null +++ b/tests/events.py @@ -0,0 +1,35 @@ +import giambio + + +async def child(notifier: giambio.Event, timeout: int): + print("[child] Child is alive!") + if timeout: + print(f"[child] Waiting for events for up to {timeout} seconds") + else: + print("[child] Waiting for events") + notification = await notifier.pause(timeout=timeout) + if notifier._timeout_expired: + print("[child] Parent was too slow!") + else: + print(f"[child] Parent said: {notification}") + + +async def parent(pause: int = 1, child_timeout: int = 0): + event = giambio.Event(scheduler) + print("[parent] Spawning child task") + task = scheduler.create_task(child(event, child_timeout)) + print(f"[parent] Sleeping {pause} second(s) before setting the event") + await giambio.sleep(pause) + print("[parent] Event set") + await event.set("Hi, my child") + if not event.event_caught: + print("[parent] Event not delivered, the timeout has expired") + else: + print("[parent] Event delivered") + await task.join() + print("[parent] Child exited") + + +if __name__ == "__main__": + scheduler = giambio.AsyncScheduler() + scheduler.start(parent(4, 5))