diff --git a/LICENSE b/LICENSE index 71adf4e..38c8310 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ Apache License Version 2.0, January 2004 - http://www.apache.org/licenses/ + https://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION diff --git a/giambio/__init__.py b/giambio/__init__.py index 084109b..b94bed8 100644 --- a/giambio/__init__.py +++ b/giambio/__init__.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https:www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/giambio/context.py b/giambio/context.py index ad7d01e..8c355de 100644 --- a/giambio/context.py +++ b/giambio/context.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -16,10 +16,9 @@ See the License for the specific language governing permissions and limitations under the License. """ -from lib2to3.pgen2.token import OP -import types import giambio -from typing import List, Optional +from giambio.task import Task +from typing import List, Optional, Callable, Coroutine, Any class TaskManager: @@ -33,13 +32,13 @@ class TaskManager: :type raise_on_timeout: bool, optional """ - def __init__(self, timeout: float = None, raise_on_timeout: bool = True) -> None: + def __init__(self, current_task: Task, timeout: float = None, raise_on_timeout: bool = False) -> None: """ Object constructor """ # All the tasks that belong to this pool - self.tasks: List[giambio.task.Task] = [] + self.tasks: List[Task] = [] # Whether we have been cancelled or not self.cancelled: bool = False # The clock time of when we started running, used for @@ -52,12 +51,21 @@ class TaskManager: self.timeout = None # Whether our timeout expired or not self.timed_out: bool = False + # Internal check so users don't try + # to use the pool manually self._proper_init = False + # We keep track of any inner pools to propagate + # exceptions properly self.enclosed_pool: Optional["giambio.context.TaskManager"] = None + # Do we raise an error after timeout? self.raise_on_timeout: bool = raise_on_timeout - self.entry_point: Optional[giambio.Task] = None + # The task that created the pool. We keep track of + # it because we only cancel ourselves if this task + # errors out (so if the error is caught before reaching + # it we just do nothing) + self.owner: Task = current_task - async def spawn(self, func: types.FunctionType, *args, **kwargs) -> "giambio.task.Task": + async def spawn(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, **kwargs) -> "giambio.task.Task": """ Spawns a child task """ @@ -72,7 +80,6 @@ class TaskManager: """ self._proper_init = True - self.entry_point = await giambio.traps.current_task() return self async def __aexit__(self, exc_type: Exception, exc: Exception, tb): @@ -88,13 +95,14 @@ class TaskManager: # children to exit await task.join() self.tasks.remove(task) - self._proper_init = False - if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: - return True except giambio.exceptions.TooSlowError: if self.raise_on_timeout: raise - + finally: + self._proper_init = False + if isinstance(exc, giambio.exceptions.TooSlowError) and not self.raise_on_timeout: + return True + async def cancel(self): """ Cancels the pool entirely, iterating over all @@ -112,4 +120,4 @@ class TaskManager: pool have exited, False otherwise """ - return self._proper_init and all([task.done() for task in self.tasks]) and (True if not self.enclosed_pool else self.enclosed_pool.done()) + return self._proper_init and all([task.done() for task in self.tasks]) diff --git a/giambio/core.py b/giambio/core.py index e6f68d9..ff888c0 100644 --- a/giambio/core.py +++ b/giambio/core.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,12 @@ limitations under the License. """ # Import libraries and internal resources -from lib2to3.pytree import Base -import types from giambio.task import Task from collections import deque from functools import partial from timeit import default_timer from giambio.context import TaskManager -from typing import Callable, List, Optional, Any, Dict +from typing import Callable, List, Optional, Any, Dict, Coroutine from giambio.util.debug import BaseDebugger from giambio.internal import TimeQueue, DeadlinesQueue from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE @@ -57,7 +55,7 @@ class AsyncScheduler: :param clock: A callable returning monotonically increasing values at each call, usually using seconds as units, but this is not enforced, defaults to timeit.default_timer - :type clock: :class: types.FunctionType + :type clock: :class: Callable :param debugger: A subclass of giambio.util.BaseDebugger or None if no debugging output is desired, defaults to None :type debugger: :class: giambio.util.BaseDebugger @@ -74,7 +72,7 @@ class AsyncScheduler: def __init__( self, - clock: types.FunctionType = default_timer, + clock: Callable = default_timer, debugger: Optional[BaseDebugger] = None, selector: Optional[Any] = None, io_skip_limit: Optional[int] = None, @@ -96,7 +94,7 @@ class AsyncScheduler: or type( "DumbDebugger", (object,), - {"__getattr__": lambda *_: lambda *_: None}, + {"__getattr__": lambda *args: lambda *arg: None}, )() ) # All tasks the loop has @@ -108,7 +106,7 @@ class AsyncScheduler: # This will always point to the currently running coroutine (Task object) self.current_task: Optional[Task] = None # Monotonic clock to keep track of elapsed time reliably - self.clock: types.FunctionType = clock + self.clock: Callable = clock # Tasks that are asleep self.paused: TimeQueue = TimeQueue(self.clock) # Have we ever ran? @@ -131,7 +129,6 @@ class AsyncScheduler: self.entry_point: Optional[Task] = None # Suspended tasks self.suspended: deque = deque() - def __repr__(self): """ @@ -153,8 +150,6 @@ class AsyncScheduler: "_data", "io_skip_limit", "io_max_timeout", - "suspended", - "entry_point" } data = ", ".join( name + "=" + str(value) for name, value in zip(fields, (getattr(self, field) for field in fields)) @@ -211,10 +206,7 @@ class AsyncScheduler: # after it is set, but it makes the implementation easier if not self.current_pool and self.current_task.pool: self.current_pool = self.current_task.pool - pool = self.current_pool - while pool: - self.deadlines.put(pool) - pool = self.current_pool.enclosed_pool + self.deadlines.put(self.current_pool) # If there are no actively running tasks, we start by # checking for I/O. This method will wait for I/O until # the closest deadline to avoid starving sleeping tasks @@ -241,7 +233,6 @@ class AsyncScheduler: # represents the return value of the coroutine, if any. Of course this # exception is not an error and we should happily keep going after it, # most of this code below is just useful for internal/debugging purposes - self.current_task.status = "end" self.current_task.result = ret.value self.current_task.finished = True self.join(self.current_task) @@ -253,22 +244,20 @@ class AsyncScheduler: self.current_task.exc = err self.join(self.current_task) - - def create_task(self, corofunc: types.FunctionType, pool, *args, **kwargs) -> Task: + def create_task(self, coro: Coroutine[Any, Any, Any], pool) -> Task: """ Creates a task from a coroutine function and schedules it to run. The associated pool that spawned said task is also needed, while any extra keyword or positional arguments are passed to the function itself - :param corofunc: The coroutine function (not a coroutine!) to - spawn - :type corofunc: function + :param coro: The coroutine to spawn + :type coro: Coroutine[Any, Any, Any] :param pool: The giambio.context.TaskManager object that spawned the task """ - task = Task(corofunc.__name__ or str(corofunc), corofunc(*args, **kwargs), pool) + task = Task(coro.__name__ or str(coro), coro, pool) task.next_deadline = pool.timeout or 0.0 task.joiners = {self.current_task} self._data[self.current_task] = task @@ -299,15 +288,9 @@ class AsyncScheduler: # We need to make sure we don't try to execute # exited tasks that are on the running queue return - if self.current_pool: - if self.current_task.pool and self.current_task.pool is not self.current_pool: - self.current_task.pool.enclosed_pool = self.current_pool - else: + if not self.current_pool and self.current_task.pool: self.current_pool = self.current_task.pool - pool = self.current_pool - while pool: - self.deadlines.put(pool) - pool = self.current_pool.enclosed_pool + self.deadlines.put(self.current_pool) self.debugger.before_task_step(self.current_task) # Some debugging and internal chatter here self.current_task.status = "run" @@ -351,7 +334,7 @@ class AsyncScheduler: if self.selector.get_map(): for k in filter( - lambda o: o.data == task, + lambda o: o.data == self.current_task, dict(self.selector.get_map()).values(), ): self.io_release(k.fileobj) @@ -361,16 +344,11 @@ class AsyncScheduler: """ Suspends execution of the current task. This is basically a do-nothing method, since it will not reschedule the task - before returning. The task will stay suspended as long as - something else outside the loop calls a trap to reschedule it. - Any pending I/O for the task is temporarily unscheduled to - avoid some previous network operation to reschedule the task - before it's due + before returning. The task will stay suspended until a timer, + I/O operation or cancellation wakes it up, or until another + running task reschedules it. """ - - if self.current_task.last_io or self.current_task.status == "io": - self.io_release_task(self.current_task) - self.current_task.status = "sleep" + self.suspended.append(self.current_task) def reschedule_running(self): @@ -430,32 +408,27 @@ class AsyncScheduler: try: to_call() except StopIteration as ret: - task.status = "end" task.result = ret.value task.finished = True self.join(task) - self.tasks.remove(task) except BaseException as err: task.exc = err self.join(task) - if task in self.tasks: - self.tasks.remove(task) def prune_deadlines(self): """ Removes expired deadlines after their timeout - has expired + has expired and cancels their associated pool """ while self.deadlines and self.deadlines.get_closest_deadline() <= self.clock(): pool = self.deadlines.get() pool.timed_out = True - self.cancel_pool(pool) for task in pool.tasks: - self.join(task) - if pool.entry_point is self.entry_point: - self.handle_task_exit(self.entry_point, partial(self.entry_point.throw, TooSlowError(self.entry_point))) - self.run_ready.append(self.entry_point) + if task is not pool.owner: + self.handle_task_exit(task, partial(task.throw, TooSlowError(self.current_task))) + if pool.raise_on_timeout: + self.handle_task_exit(pool.owner, partial(pool.owner.throw, TooSlowError(self.current_task))) def schedule_tasks(self, tasks: List[Task]): """ @@ -466,8 +439,7 @@ class AsyncScheduler: for task in tasks: self.paused.discard(task) - if task in self.suspended: - self.suspended.remove(task) + self.suspended.remove(task) self.run_ready.extend(tasks) self.reschedule_running() @@ -490,7 +462,6 @@ class AsyncScheduler: self.run_ready.append(task) self.debugger.after_sleep(task, slept) - def get_closest_deadline(self) -> float: """ Gets the closest expiration deadline (asleep tasks, timeouts) @@ -498,7 +469,7 @@ class AsyncScheduler: :return: The closest deadline according to our clock :rtype: float """ - + if not self.deadlines: # If there are no deadlines just wait until the first task wakeup timeout = max(0.0, self.paused.get_closest_deadline() - self.clock()) @@ -551,7 +522,7 @@ class AsyncScheduler: self.run_ready.append(key.data) # Resource ready? Schedule its task self.debugger.after_io(self.clock() - before_time) - def start(self, func: types.FunctionType, *args, loop: bool = True): + def start(self, func: Callable[..., Coroutine[Any, Any, Any]], *args, loop: bool = True): """ Starts the event loop from a sync context. If the loop parameter is false, the event loop will not start listening for events @@ -564,12 +535,9 @@ class AsyncScheduler: self.run_ready.append(entry) self.debugger.on_start() if loop: - try: - self.run() - finally: - self.has_ran = True - self.close() - self.debugger.on_exit() + self.run() + self.has_ran = True + self.debugger.on_exit() def cancel_pool(self, pool: TaskManager) -> bool: """ @@ -621,9 +589,8 @@ class AsyncScheduler: If ensure_done equals False, the loop will cancel ALL running and scheduled tasks and then tear itself down. If ensure_done equals True, which is the default behavior, - this method will raise a GiambioError exception if the loop - hasn't finished running. The state of the event loop is reset - so it can be reused with another run() call + this method will raise a GiambioError if the loop hasn't + finished running. """ if ensure_done: @@ -631,16 +598,6 @@ class AsyncScheduler: elif not self.done(): raise GiambioError("event loop not terminated, call this method with ensure_done=False to forcefully exit") self.shutdown() - # We reset the event loop's state - self.tasks = [] - self.entry_point = None - self.current_pool = None - self.current_task = None - self.paused = TimeQueue(self.clock) - self.deadlines = DeadlinesQueue() - self.run_ready = deque() - self.suspended = deque() - def reschedule_joiners(self, task: Task): """ @@ -648,35 +605,49 @@ class AsyncScheduler: given task, if any """ - if task.pool and task.pool.enclosed_pool and not task.pool.enclosed_pool.done(): - return - self.run_ready.extend(task.joiners) + for t in task.joiners: + self.run_ready.append(t) + + # noinspection PyMethodMayBeStatic + def is_pool_done(self, pool: Optional[TaskManager]): + """ + Returns True if a given pool has finished + executing + """ + + while pool: + if not pool.done(): + return False + pool = pool.enclosed_pool + return True def join(self, task: Task): """ - Joins a task to its callers (implicitly the parent + Joins a task to its callers (implicitly, the parent task, but also every other task who called await task.join() on the task object) """ task.joined = True + if any([task.finished, task.cancelled, task.exc]) and task in self.tasks: + self.io_release_task(task) + self.tasks.remove(task) + self.paused.discard(task) if task.finished or task.cancelled: + task.status = "end" if not task.cancelled: + task.status = "cancelled" + # This way join() returns the + # task's return value + for joiner in task.joiners: + self._data[joiner] = task.result self.debugger.on_task_exit(task) - if task.last_io: - self.io_release_task(task) - if task in self.suspended: - self.suspended.remove(task) - # If the pool (including any enclosing pools) has finished executing - # or we're at the first task that kicked the loop, we can safely - # reschedule the parent(s) - if task.pool is None: - return - if task.pool.done(): + # If the pool has finished executing or we're at the first parent + # task that kicked the loop, we can safely reschedule the parent(s) + if self.is_pool_done(task.pool): self.reschedule_joiners(task) + self.reschedule_running() elif task.exc: - if task in self.suspended: - self.suspended.remove(task) task.status = "crashed" if task.exc.__traceback__: # TODO: We might want to do a bit more complex traceback hacking to remove any extra @@ -686,26 +657,35 @@ class AsyncScheduler: if task.exc.__traceback__.tb_next: task.exc.__traceback__ = task.exc.__traceback__.tb_next self.debugger.on_exception_raised(task, task.exc) - if task.pool is None or task is self.entry_point: - # Parent task has no pool, so we propagate - raise task.exc - if self.cancel_pool(task.pool): - # This will reschedule the parent(s) - # only if all the tasks inside the task's - # pool have finished executing, either - # by cancellation, an exception - # or just returned - for t in task.joiners.copy(): - # Propagate the exception - try: - t.throw(task.exc) - except (StopIteration, CancelledError, RuntimeError): - # TODO: Need anything else? - task.joiners.remove(t) - finally: - if t in self.tasks: - self.tasks.remove(t) - self.reschedule_joiners(task) + if task is self.entry_point and not task.pool: + try: + task.throw(task.exc) + except StopIteration: + ... # TODO: ? + except BaseException: + # TODO: No idea what to do here + raise + elif any(map(lambda tk: tk is task.pool.owner, task.joiners)) or task is task.pool.owner: + # We check if the pool's + # owner catches our error + # or not. If they don't, we + # cancel the entire pool, but + # if they do, we do nothing + if task.pool.owner is not task: + self.handle_task_exit(task.pool.owner, partial(task.pool.owner.coroutine.throw, task.exc)) + if any([task.pool.owner.exc, task.pool.owner.cancelled, task.pool.owner.finished]): + for t in task.joiners.copy(): + # Propagate the exception + self.handle_task_exit(t, partial(t.throw, task.exc)) + if any([t.exc, t.finished, t.cancelled]): + task.joiners.remove(t) + for t in task.pool.tasks: + if not t.joined: + self.handle_task_exit(t, partial(t.throw, task.exc)) + if any([t.exc, t.finished, t.cancelled]): + task.joiners.discard(t) + self.reschedule_joiners(task) + self.reschedule_running() def sleep(self, seconds: int or float): """ @@ -747,8 +727,6 @@ class AsyncScheduler: self.io_release_task(task) elif task.status == "sleep": self.paused.discard(task) - if task in self.suspended: - self.suspended.remove(task) try: self.do_cancel(task) except CancelledError as cancel: @@ -764,23 +742,24 @@ class AsyncScheduler: task = cancel.task task.cancel_pending = False task.cancelled = True - task.status = "cancelled" + self.io_release_task(self.current_task) self.debugger.after_cancel(task) self.tasks.remove(task) else: # If the task ignores our exception, we'll # raise it later again task.cancel_pending = True + self.join(task) def register_sock(self, sock, evt_type: str): """ Registers the given socket inside the - selector to perform I/O multiplexing + selector to perform I/0 multiplexing :param sock: The socket on which a read or write operation - has to be performed + has to be performed :param evt_type: The type of event to perform on the given - socket, either "read" or "write" + socket, either "read" or "write" :type evt_type: str """ @@ -814,8 +793,5 @@ class AsyncScheduler: try: self.selector.register(sock, evt, self.current_task) except KeyError: - # The socket is already registered doing something else, we - # modify the socket instead (or maybe not?) - self.selector.modify(sock, evt, self.current_task) - # TODO: Does this break stuff? - # raise ResourceBusy("The given socket is being read/written by another task") from None + # The socket is already registered doing something else + raise ResourceBusy("The given socket is being read/written by another task") from None diff --git a/giambio/exceptions.py b/giambio/exceptions.py index 8d1c56e..dc45242 100644 --- a/giambio/exceptions.py +++ b/giambio/exceptions.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/giambio/internal.py b/giambio/internal.py index 0922471..f09d1b4 100644 --- a/giambio/internal.py +++ b/giambio/internal.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/giambio/io.py b/giambio/io.py index 13cdd5d..1d060f4 100644 --- a/giambio/io.py +++ b/giambio/io.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import warnings import giambio from giambio.exceptions import ResourceClosed @@ -94,6 +95,7 @@ class AsyncSocket: if self._fd == -1: raise ResourceClosed("I/O operation on closed socket") + sent_no = 0 while data: try: sent_no = self.sock.send(data, flags) @@ -160,33 +162,33 @@ class AsyncSocket: # arsed to write a bunch of uninteresting simple socket # methods from scratch, deal with it. - def fileno(self): + async def fileno(self): """ Wrapper socket method """ return self._fd - def settimeout(self, seconds): + async def settimeout(self, seconds): """ Wrapper socket method """ raise RuntimeError("Use with_timeout() to set a timeout") - def gettimeout(self): + async def gettimeout(self): """ Wrapper socket method """ return None - def dup(self): + async def dup(self): """ Wrapper socket method """ - return type(self)(self._socket.dup()) + return type(self)(self.sock.dup()) async def do_handshake(self): """ @@ -308,3 +310,11 @@ class AsyncSocket: def __repr__(self): return f"AsyncSocket({self.sock})" + + def __del__(self): + """ + Socket destructor + """ + + if not self._fd != -1: + warnings.warn(f"socket '{self}' was destroyed, but was not closed, leading to a potential resource leak") diff --git a/giambio/runtime.py b/giambio/runtime.py index 8029c71..eb089e8 100644 --- a/giambio/runtime.py +++ b/giambio/runtime.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -23,7 +23,7 @@ from giambio.exceptions import GiambioError from giambio.context import TaskManager from timeit import default_timer from giambio.util.debug import BaseDebugger -from types import FunctionType +from typing import Coroutine, Callable, Any thread_local = threading.local() @@ -41,7 +41,7 @@ def get_event_loop(): raise GiambioError("giambio is not running") from None -def new_event_loop(debugger: BaseDebugger, clock: FunctionType): +def new_event_loop(debugger: BaseDebugger, clock: Callable): """ Associates a new event loop to the current thread and deactivates the old one. This should not be @@ -62,7 +62,7 @@ def new_event_loop(debugger: BaseDebugger, clock: FunctionType): thread_local.loop = AsyncScheduler(clock, debugger) -def run(func: FunctionType, *args, **kwargs): +def run(func: Callable[[Any, Any], Coroutine[Any, Any, Any]], *args, **kwargs): """ Starts the event loop from a synchronous entry point """ @@ -92,7 +92,7 @@ def create_pool(): Creates an async pool """ - return TaskManager() + return TaskManager(get_event_loop().current_task) def with_timeout(timeout: int or float): @@ -101,10 +101,11 @@ def with_timeout(timeout: int or float): """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout) + mgr = TaskManager(get_event_loop().current_task, timeout, True) loop = get_event_loop() if loop.current_task is loop.entry_point: loop.current_pool = mgr + loop.current_task.pool = mgr return mgr @@ -116,8 +117,9 @@ def skip_after(timeout: int or float): """ assert timeout > 0, "The timeout must be greater than 0" - mgr = TaskManager(timeout, False) + mgr = TaskManager(get_event_loop().current_task, timeout) loop = get_event_loop() if loop.current_task is loop.entry_point: loop.current_pool = mgr + loop.current_task.pool = mgr return mgr diff --git a/giambio/socket.py b/giambio/socket.py index 8ef2d8d..aecae4e 100644 --- a/giambio/socket.py +++ b/giambio/socket.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -35,4 +35,4 @@ def socket(*args, **kwargs): constructor """ - return AsyncSocket(_socket.socket(*args, **kwargs)) + return wrap_socket(_socket.socket(*args, **kwargs)) diff --git a/giambio/sync.py b/giambio/sync.py index ee1f600..badd2d7 100644 --- a/giambio/sync.py +++ b/giambio/sync.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/giambio/task.py b/giambio/task.py index 64b6515..047432e 100644 --- a/giambio/task.py +++ b/giambio/task.py @@ -98,7 +98,7 @@ class Task: :type err: Exception """ - # self.exc = err + self.exc = err return self.coroutine.throw(err) async def join(self): @@ -108,8 +108,6 @@ class Task: are propagated as well """ - if task := await giambio.traps.current_task(): - self.joiners.add(task) return await giambio.traps.join(self) async def cancel(self): diff --git a/giambio/traps.py b/giambio/traps.py index 33af4ce..706618a 100644 --- a/giambio/traps.py +++ b/giambio/traps.py @@ -3,7 +3,10 @@ Implementation for all giambio traps, which are hooks into the event loop and allow it to switch tasks. These coroutines are the one and only way to interact with the event loop from the user's perspective, and -the entire library is based on them +the entire library is based on them. These low-level +primitives should not be used on their own unless you +know what you're doing: the library already abstracts +them away with more complex object wrappers and functions Copyright (C) 2020 nocturn9x @@ -11,7 +14,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -36,19 +39,20 @@ def create_trap(method, *args): interact with the event loop """ - data = yield method, *args - return data + return (yield method, *args) async def suspend() -> Any: """ - Suspends the current task + Suspends the current task. The + task can still be woken up by + any pending timers, I/O or cancellations """ return await create_trap("suspend") -async def create_task(coro: Callable[..., Coroutine[Any, Any, Any]], pool, *args): +async def create_task(coro: Callable[[Any, Any], Coroutine[Any, Any, Any]], pool, *args, **kwargs): """ Spawns a new task in the current event loop from a bare coroutine function. All extra positional arguments are passed to the function @@ -59,11 +63,11 @@ async def create_task(coro: Callable[..., Coroutine[Any, Any, Any]], pool, *args if inspect.iscoroutine(coro): raise GiambioError( - "Looks like you tried to call giambio.run(your_func(arg1, arg2, ...)), that is wrong!" - "\nWhat you wanna do, instead, is this: giambio.run(your_func, arg1, arg2, ...)" + "Looks like you tried to call pool.create_task(your_func(arg1, arg2, ...)), that is wrong!" + "\nWhat you wanna do, instead, is this: pool.create_task(your_func, arg1, arg2, ...)" ) elif inspect.iscoroutinefunction(coro): - return await create_trap("create_task", coro, pool, *args) + return await create_trap("create_task", coro(*args, **kwargs), pool) else: raise TypeError("coro must be a coroutine function") diff --git a/giambio/util/__init__.py b/giambio/util/__init__.py index 77c5f82..f663e01 100644 --- a/giambio/util/__init__.py +++ b/giambio/util/__init__.py @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/giambio/util/debug.py b/giambio/util/debug.py index 2bd903d..e93012e 100644 --- a/giambio/util/debug.py +++ b/giambio/util/debug.py @@ -7,7 +7,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/tests/cancel.py b/tests/cancel.py index c75a51a..d340a31 100644 --- a/tests/cancel.py +++ b/tests/cancel.py @@ -4,18 +4,25 @@ from debugger import Debugger async def child(name: int): print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") - await giambio.sleep(name) + try: + await giambio.sleep(name) + except giambio.exceptions.CancelledError: + # Perform some cleanup + print(f"[child {name}] I have been cancelled!") + raise # Important! Not re-raising the exception *will* break giambio print(f"[child {name}] Had a nice nap!") async def main(): start = giambio.clock() async with giambio.create_pool() as pool: - # await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! + await pool.spawn(child, 1) # If you comment this line, the pool will exit immediately! task = await pool.spawn(child, 2) print("[main] Children spawned, awaiting completion") await task.cancel() print("[main] Second child cancelled") + # This code always executes, no matter what happens inside the pool (unless an exception + # is raised) print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") diff --git a/tests/chatroom_client.py b/tests/chatroom_client.py index b6d2382..e6c0a9b 100644 --- a/tests/chatroom_client.py +++ b/tests/chatroom_client.py @@ -13,7 +13,6 @@ async def sender(sock: giambio.socket.AsyncSocket, q: giambio.Queue): async def receiver(sock: giambio.socket.AsyncSocket, q: giambio.Queue): data = b"" - buffer = b"" while True: while not data.endswith(b"\n"): data += await sock.receive(1024) diff --git a/tests/chatroom_server.py b/tests/chatroom_server.py index 441b16d..e46d07a 100644 --- a/tests/chatroom_server.py +++ b/tests/chatroom_server.py @@ -46,7 +46,7 @@ async def handler(sock: AsyncSocket, client_address: tuple): address = f"{client_address[0]}:{client_address[1]}" async with sock: # Closes the socket automatically - await sock.send_all(b"Welcome to the chartoom pal, start typing and press enter!\n") + await sock.send_all(b"Welcome to the chatroom pal, start typing and press enter!\n") while True: data = await sock.receive(1024) if not data: diff --git a/tests/echo_server.py b/tests/echo_server.py index 7a79a60..761c1f1 100644 --- a/tests/echo_server.py +++ b/tests/echo_server.py @@ -1,5 +1,6 @@ import giambio from giambio.socket import AsyncSocket +from debugger import Debugger import logging import sys @@ -65,7 +66,7 @@ if __name__ == "__main__": datefmt="%d/%m/%Y %p", ) try: - giambio.run(serve, ("localhost", port)) + giambio.run(serve, ("localhost", port), debugger=()) except (Exception, KeyboardInterrupt) as error: # Exceptions propagate! if isinstance(error, KeyboardInterrupt): logging.info("Ctrl+C detected, exiting") diff --git a/tests/event_timeout.py b/tests/event_timeout.py new file mode 100644 index 0000000..f476285 --- /dev/null +++ b/tests/event_timeout.py @@ -0,0 +1,43 @@ +from debugger import Debugger +import giambio + + +# A test for events + + +async def child(ev: giambio.Event, pause: int): + print("[child] Child is alive! Going to wait until notified") + start_total = giambio.clock() + data = await ev.wait() + end_pause = giambio.clock() - start_total + print(f"[child] Parent set the event with value {data}, exiting in {pause} seconds") + start_sleep = giambio.clock() + await giambio.sleep(pause) + end_sleep = giambio.clock() - start_sleep + end_total = giambio.clock() - start_total + print( + f"[child] Done! Slept for {end_total:.2f} seconds total ({end_pause:.2f} waiting, {end_sleep:.2f} sleeping), nice nap!" + ) + + +async def parent(pause: int = 1): + async with giambio.skip_after(5) as pool: + # The pool created by skip_after will + # just silently keep going after 5 + # seconds and raise no error + event = giambio.Event() + print("[parent] Spawning child task") + await pool.spawn(child, event, pause + 2) + start = giambio.clock() + print(f"[parent] Sleeping {pause} second(s) before setting the event") + await giambio.sleep(pause) + await event.trigger(True) + print("[parent] Event set, awaiting child completion") + end = giambio.clock() - start + if pool.timed_out: + print("[parent] Timeout has expired!") + print(f"[parent] Child exited in {end:.2f} seconds") + + +if __name__ == "__main__": + giambio.run(parent, 2, debugger=()) diff --git a/tests/memory_channel.py b/tests/memory_channel.py index ee2a8e8..3c1c62d 100644 --- a/tests/memory_channel.py +++ b/tests/memory_channel.py @@ -21,11 +21,11 @@ async def receiver(c: giambio.MemoryChannel): async def main(channel: giambio.MemoryChannel, n: int): + print("Starting sender and receiver") async with giambio.create_pool() as pool: await pool.spawn(sender, channel, n) await pool.spawn(receiver, channel) - + print("All done!") -channel = giambio.MemoryChannel(2) -giambio.run(main, channel, 5, debugger=()) \ No newline at end of file +giambio.run(main, giambio.MemoryChannel(2), 5, debugger=()) # 2 is the max size of the channel diff --git a/tests/queue.py b/tests/queue.py index 2e70124..90ec140 100644 --- a/tests/queue.py +++ b/tests/queue.py @@ -30,11 +30,12 @@ async def consumer(q: giambio.Queue): async def main(q: giambio.Queue, n: int): + print("Starting consumer and producer") async with giambio.create_pool() as pool: await pool.spawn(producer, q, n) await pool.spawn(consumer, q) print("Bye!") -queue = giambio.Queue(1) # Queue has size limit of 1 +queue = giambio.Queue(2) # Queue has size limit of 2 giambio.run(main, queue, 5, debugger=()) diff --git a/tests/task_ipc2.py b/tests/task_ipc2.py index 7c6f30c..c8e3f0a 100644 --- a/tests/task_ipc2.py +++ b/tests/task_ipc2.py @@ -29,5 +29,4 @@ async def main(channel: giambio.NetworkChannel, delay: int): print(f"[main] Cleared {await channel.read(1)!r}") -channel = giambio.NetworkChannel() -giambio.run(main, channel, 4, debugger=()) \ No newline at end of file +giambio.run(main, giambio.NetworkChannel(), 4, debugger=()) diff --git a/tests/timeout3.py b/tests/timeout3.py index bb8d9ab..de5c837 100644 --- a/tests/timeout3.py +++ b/tests/timeout3.py @@ -6,29 +6,26 @@ async def child(name: int): print(f"[child {name}] Child spawned!! Sleeping for {name} seconds") await giambio.sleep(name) print(f"[child {name}] Had a nice nap!") -<<<<<<< HEAD return name -======= async def worker(coro, *args): try: - async with giambio.with_timeout(10): + async with giambio.with_timeout(2): await coro(*args) except giambio.exceptions.TooSlowError: return True return False ->>>>>>> origin/master async def main(): start = giambio.clock() -<<<<<<< HEAD try: async with giambio.with_timeout(5) as pool: - task = await pool.spawn(child, 2) - print(f"Child has returned: {await task.join()}") - await giambio.sleep(5) # This will trigger the timeout + task = await pool.spawn(worker, child, 5) + print(f"[main] Child has returned: {await task.join()}") + print(f"[main] Sleeping") + await giambio.sleep(500) # This will trigger the timeout except giambio.exceptions.TooSlowError: print("[main] One or more children have timed out!") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") @@ -36,17 +33,4 @@ async def main(): if __name__ == "__main__": - giambio.run(main, debugger=()) -======= - async with giambio.skip_after(10) as pool: - t = await pool.spawn(worker, child, 7) - await giambio.sleep(2) - t2 = await pool.spawn(worker, child, 15) - if any([await t.join(), await t2.join()]): - print("[main] One or more children have timed out!") - print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds") - - -if __name__ == "__main__": - giambio.run(main, debugger=()) ->>>>>>> origin/master + giambio.run(main, debugger=Debugger()) diff --git a/tests/triple_nested_exception.py b/tests/triple_nested_exception.py index 8862abd..7c7c77c 100644 --- a/tests/triple_nested_exception.py +++ b/tests/triple_nested_exception.py @@ -42,7 +42,7 @@ async def main(): print("[main] First 2 children spawned, awaiting completion") async with giambio.create_pool() as a_pool: await a_pool.spawn(child1) - print("[main] Third children spawned, prepare for trouble in 2 seconds") + print("[main] Third children spawned, prepare for delayed trouble in 2 seconds") async with giambio.create_pool() as new_pool: # This pool will be cancelled by the exception # in the outer pool @@ -50,6 +50,7 @@ async def main(): await new_pool.spawn(child3) print("[main] Fourth and fifth children spawned") except Exception as error: + # raise # Because exceptions just *work*! print(f"[main] Exception from child caught! {repr(error)}") print(f"[main] Children execution complete in {giambio.clock() - start:.2f} seconds")