Browse Source

Initial commit

master
nocturn9x 9 months ago
parent
commit
2c34afe03c
  1. 8
      .idea/.gitignore
  2. 10
      .idea/asyncevents.iml
  3. 42
      .idea/inspectionProfiles/Project_Default.xml
  4. 6
      .idea/inspectionProfiles/profiles_settings.xml
  5. 4
      .idea/misc.xml
  6. 8
      .idea/modules.xml
  7. 6
      .idea/vcs.xml
  8. 16
      LICENSE
  9. 86
      README.md
  10. 97
      asyncevents/__init__.py
  11. 67
      asyncevents/constants.py
  12. 19
      asyncevents/errors.py
  13. 393
      asyncevents/events.py
  14. 1
      requirements.txt
  15. 32
      setup.py
  16. 27
      tests/on_error.py
  17. 36
      tests/simple_example.py

8
.idea/.gitignore

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

10
.idea/asyncevents.iml

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

42
.idea/inspectionProfiles/Project_Default.xml

@ -0,0 +1,42 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<Languages>
<language minSize="46" name="Python" />
</Languages>
</inspection_tool>
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="2">
<item index="0" class="java.lang.String" itemvalue="tortoise" />
<item index="1" class="java.lang.String" itemvalue="starlette" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8Inspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="W605" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N813" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="pyrogram.types.user_and_chats.user.User.from_user" />
</list>
</option>
</inspection_tool>
<inspection_tool class="SqlNoDataSourceInspection" enabled="false" level="WARNING" enabled_by_default="false" />
</profile>
</component>

6
.idea/inspectionProfiles/profiles_settings.xml

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (asyncevents)" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/asyncevents.iml" filepath="$PROJECT_DIR$/.idea/asyncevents.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/asyncevents" vcs="Git" />
</component>
</project>

16
LICENSE

@ -184,18 +184,4 @@
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
identification within third-party archives.

86
README.md

@ -1,2 +1,84 @@
# asyncevents
asyncevents is a small library to help developers perform asynchronous event handling in modern Python code
# asyncevents - Asynchronous event handling for modern Python
asyncevents is a small library to help developers perform asynchronous event handling in modern Python code.
## Features
- Priority queueing to allow for deterministic event triggering (with a catch, see below)
- Built-in exception handling (optional)
- The public API is fully type hinted for those sweet, sweet editor suggestions
- Public API is fully documented (and some private stuff too): you could write this from scratch in a couple of hours (like I did)
- Very small (~200 CLOC), although it can't fit on a postcard
__Note__: Deterministic event handling can only occur in blocking mode, i.e. when a call to `emit()` blocks until
all event handlers have run. If the non-blocking mode is used, handlers are started according to their priority,
but there's no telling on how they will be further scheduled to run and that depends entirely on the underlying
asyncio event loop
## Limitations
- Only compatible with asyncio due to the fact that other libraries such as trio and curio have wildly different design
goals (structured concurrency, separation of environments, etc.), which makes implementing some functionality
(i.e. the `wait()` method) tricky, if not outright impossible. Also those libraries, especially trio, already have
decent machinery to perform roughly what asyncevent does
- Does not support using any other loop than the currently running one because of some subtleties of modern asyncio
wrappers like `asyncio.run()` which creates its own event loop internally (_Thanks, asyncio_)
- Exceptions are kinda finicky in non-blocking mode due to how `asyncio.gather` works: only the first exception
in a group of handlers is properly raised and log messages might get doubled. Also, exception logging and propagation
is delayed until you `await wait("some_event")` so be careful
## Why?
This library exists because the current alternatives either suck, lack features or are inspired by other languages'
implementations of events like C# and Node.js: asyncevents aims to be a fully Pythonic library that provides just the
features you need and nothing more (nor nothing less).
## Cool! How do I use it?
Like this
```python3
import time
import asyncio
from asyncevents import on_event, emit, wait
@on_event("hello")
async def hello(_, event: str):
print(f"Hello {event!r}!")
@on_event("hi")
async def hi(_, event: str):
print(f"Hi {event!r}! I'm going to sleep for 5 seconds")
await asyncio.sleep(5) # Simulates some work
async def main():
print("Firing blocking event 'hello'")
await emit("hello") # This call blocks until hello() terminates
print("Handlers for event 'hello' have exited")
# Notice how, until here, the output is in order: this is on purpose!
# When using blocking mode, asyncevents even guarantees that handlers
# with different priorities will be executed in order
print("Firing blocking event 'hello'")
await emit("hi", block=False) # This one spawns hi() and returns immediately
print("Non-blocking event 'hello' fired")
await emit("event3") # Does nothing: No handlers registered for event3!
# We wait now for the the handler of the "hi" event to complete
t = time.time()
print("Waiting on event 'hi'")
await wait("hi") # Waits until all the handlers triggered by the "hi" event exit
print(f"Waited for {time.time() - t:.2f} seconds") # Should print roughly 5 seconds
if __name__ == "__main__":
asyncio.run(main())
```
## TODOs
- Documentation
- More tests
- Trio/curio backend (maybe)

97
asyncevents/__init__.py

@ -0,0 +1,97 @@
# Copyright (C) 2021 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from asyncevents import events, errors
from typing import Any, Callable, Coroutine, Optional
from asyncevents.constants import ExceptionHandling, ExecutionMode, UnknownEventHandling
# Thread-local namespace for storing the currently
# used AsyncEventEmitter instance automatically
local_storage: threading.local = threading.local()
AsyncEventEmitter = events.AsyncEventEmitter
def get_current_emitter():
"""
Returns the currently active
emitter in the current thread
and creates a new one if one
doesn't exist
"""
if not hasattr(local_storage, "emitter"):
local_storage.emitter = AsyncEventEmitter()
return local_storage.emitter
def set_current_emitter(emitter: AsyncEventEmitter):
"""
Sets the active emitter in the current thread.
Discards the existing one, if it exists
:param emitter: The new emitter to set
for the current thread
:type emitter: :class: AsyncEventEmitter
"""
if not isinstance(emitter, AsyncEventEmitter) and not issubclass(type(emitter), AsyncEventEmitter):
raise TypeError(
"expected emitter to be an instance of AsyncEventEmitter or a subclass thereof,"
f" found {type(emitter).__name__!r} instead"
)
local_storage.emitter = emitter
async def wait(event: Optional[str] = None):
"""
Shorthand for get_current_emitter().wait()
"""
await get_current_emitter().wait(event)
async def emit(event: str, block: bool = True):
"""
Shorthand for get_current_emitter().emit(event)
"""
await get_current_emitter().emit(event, block)
def on_event(event: str, priority: int = 0, emitter: AsyncEventEmitter = get_current_emitter(), oneshot: bool = False):
"""
Decorator shorthand of emitter.register_event(event, f, priority)
"""
def decorator(corofunc: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]]):
emitter.register_event(event, corofunc, priority, oneshot)
async def wrapper(*args, **kwargs):
return await corofunc(*args, **kwargs)
return wrapper
return decorator
__all__ = [
"events",
"errors",
"AsyncEventEmitter",
"ExceptionHandling",
"ExecutionMode",
"UnknownEventHandling",
"on_event",
"emit",
"wait",
"get_current_emitter",
"set_current_emitter"
]

67
asyncevents/constants.py

@ -0,0 +1,67 @@
# Copyright (C) 2021 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum, auto, EnumMeta
class ContainsEnumMeta(EnumMeta):
"""
Simple metaclass that implements
the item in self operation
"""
def __contains__(cls, item):
try:
cls(item)
except ValueError:
return False
else:
return True
class ExceptionHandling(Enum, metaclass=ContainsEnumMeta):
"""
Flags that control how exceptions
are handled in asyncevents. Note that,
following Python's conventions, only
subclasses of Exception are caught.
Any subclass of BaseException is ignored
as exceptions deriving directly from it,
aside from Exception itself, are not meant
to be caught
"""
IGNORE: "ExceptionHandling" = auto() # The exception is caught and ignored
LOG: "ExceptionHandling" = auto() # The exception is caught and logged
PROPAGATE: "ExceptionHandling" = auto() # The exception is not caught at all
class UnknownEventHandling(Enum, metaclass=ContainsEnumMeta):
"""
Flags that control how the event emitter
handles events for which no task has been
registered. Note that this only applies to the
emit and the wait methods, and not to
register_handler/unregister_handler
"""
IGNORE: "UnknownEventHandling" = auto() # Do nothing
LOG: "UnknownEventHandling" = auto() # Log it as a warning
ERROR: "UnknownEventHandling" = auto() # raise an UnknownEvent error
class ExecutionMode(Enum, metaclass=ContainsEnumMeta):
"""
Flags that control how the event emitter
spawns tasks
"""
PAUSE: "ExecutionMode" = auto() # Spawn tasks via "await"
NOWAIT: "ExecutionMode" = auto() # Use asyncio.create_task

19
asyncevents/errors.py

@ -0,0 +1,19 @@
# Copyright (C) 2021 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class UnknownEvent(KeyError):
"""
A KeyError subclass that is raised when
an unknown event is emitted (only when the
strategy of unknown event handling is set
to ERROR)
"""

393
asyncevents/events.py

@ -0,0 +1,393 @@
# Copyright (C) 2021 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import sys
import time
import asyncio
from functools import partial
from collections import defaultdict
from asyncevents.errors import UnknownEvent
from heapq import heappush, heapify, heappop
from logging import Logger, getLogger, INFO, Formatter, StreamHandler
from typing import Dict, List, Tuple, Coroutine, Callable, Any, Awaitable, Optional
from asyncevents.constants import ExceptionHandling, UnknownEventHandling, ExecutionMode
class AsyncEventEmitter:
"""
A simple priority-based asynchronous event emitter. In contrast to a scheduler, which
continuously runs in the background orchestrating execution of _tasks, an emitter runs
only when an event is emitted and it only runs the _tasks that are meant to catch said
event, if any.
:param on_error: Tells the emitter what to do when an exception occurs inside an event
handler. This value can either be an entry from the asyncevents.ExceptionHandling
enum or a coroutine object. If the passed object is a coroutine, it is awaited
whenever an exception is caught with the AsyncEventEmitter instance, the exception
object and the event name as arguments (errors from the exception handler itself are
not caught). Defaults to ExceptionHandling.PROPAGATE, which lets exceptions fall trough
the execution chain (other enum values are LOG, which prints a log message on the
logging.ERROR level, and IGNORE which silences the exception entirely)
:type on_error: Union[ExceptionHandling, Callable[[AsyncEventEmitter, Exception, str], Coroutine[Any, Any, Any]]],
optional
:param on_unknown_event: Tells the emitter what to do when an unknown event is triggered. An
unknown event is an event for which no handler is registered (either because it has never
been registered or because all of its handlers have been removed). This value can either be
an entry from the asyncevents.UnknownEventHandling enum or a coroutine object. If the argument
is a coroutine, it is awaited with the AsyncEventEmitter instance and the event name as arguments.
Defaults to UnknownEventHandling.IGNORE, which does nothing (other enum values are LOG, which
prints a log message on the logging.WARNING level, and ERROR which raises an UnknownEvent exception)
Note: if the given callable is a coroutine, it is awaited, while it's called normally otherwise
and its return value is discarded
:type on_unknown_event: Union[UnknownEventHandling, Callable[[AsyncEventEmitter, str], Coroutine[Any, Any, Any]]], optional
:param mode: Tells the emitter how event handlers should be spawned. It should be an entry of the
the asyncevents.ExecutionMode enum. If it is set to ExecutionMode.PAUSE, the default, the event
emitter spawns _tasks by awaiting each matching handler: this causes it to pause on every handler.
If ExecutionMode.NOWAIT is used, the emitter uses asyncio.create_task to spawns all the handlers
at the same time (note though that using this mode kind of breaks the priority queueing: the handlers
are started according to their priorities, but once they are started they are handled by asyncio's
event loop which is non-deterministic, so expect some disorder). Using ExecutionMode.NOWAIT allows
to call the emitter's wait() method, which pauses until all currently running event handlers have
completed executing (when ExecutionMode.PAUSE is used, wait() is a no-op), but note that return
values from event handlers are not returned
:type mode: ExecutionMode
"""
# Implementations for emit()
async def _check_event(self, event: str):
"""
Performs checks about the given event
and raises/logs appropriately before
emitting/waiting on it
:param event: The event name
:type event: str
"""
if self.handlers.get(event, None) is None:
if self.on_unknown_event == UnknownEventHandling.IGNORE:
return
elif self.on_unknown_event == UnknownEventHandling.ERROR:
raise UnknownEvent(f"unknown event {event!r}")
elif self.on_unknown_event == UnknownEventHandling.LOG:
self.logger.warning(f"Attempted to emit or wait on an unknown event {event!r}")
else:
# It's a coroutine! Call it
await self.on_unknown_event(self, event)
async def _catch_errors_in_awaitable(self, event: str, obj: Awaitable):
# Thanks to asyncio's *utterly amazing* (HUGE sarcasm there)
# exception handling, we have to make this wrapper so we can
# catch errors on a per-handler basis
try:
await obj
except Exception as e:
if (event, obj) in self._tasks:
obj: asyncio.Task # Silences PyCharm's warnings
self._tasks.remove((event, obj))
if inspect.iscoroutinefunction(self.on_error):
await self.on_error(self, e, event)
elif self.on_error == ExceptionHandling.PROPAGATE:
raise
elif self.on_error == ExceptionHandling.LOG:
self.logger.error(f"An exception occurred while handling {event!r}: {type(e).__name__} -> {e}")
# Note how the IGNORE case is excluded: we just do nothing after all
async def _emit_nowait(self, event: str):
# This implementation of emit() returns immediately
# and runs the handlers in the background
await self._check_event(event)
temp: List[Tuple[int, float, Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]], bool]] = []
while self.handlers[event]:
# We use heappop because we want the first
# by priority and the heap queue only has
# the guarantee we need for heap[0]
temp.append(heappop(self.handlers[event]))
task = asyncio.create_task(temp[-1][-2](self, event))
if temp[-1][-1]:
task.add_done_callback(
partial(
# The extra argument is the future asyncio passes us,
# which we don't care about
lambda s, ev, corofunc, _: s.unregister_handler(ev, corofunc),
self,
event,
temp[-1][-2],
)
)
self._tasks.append((event, asyncio.create_task(self._catch_errors_in_awaitable(event, task))))
# We push back the elements
for t in temp:
heappush(self.handlers[event], t)
async def _emit_await(self, event: str):
# This implementation of emit() returns only after
# all handlers have finished executing
await self._check_event(event)
temp: List[Tuple[int, float, Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]], bool]] = self.handlers[event].copy()
t: Tuple[int, float, Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]], bool]
while temp:
# We use heappop because we want the first
# by priority and the heap queue only has
# the guarantee we need for heap[0]
t = heappop(temp)
if t[-1]:
self.unregister_handler(event, t[-2])
await self._catch_errors_in_awaitable(event, t[-2](self, event))
def __init__(
self,
# These type hints come from https://stackoverflow.com/a/59177557/12159081
# and should correctly hint a coroutine function
on_error: ExceptionHandling
| Callable[["AsyncEventEmitter", Exception, str], Coroutine[Any, Any, Any]] = ExceptionHandling.PROPAGATE,
on_unknown_event: UnknownEventHandling
| Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]] = UnknownEventHandling.IGNORE,
mode: ExecutionMode = ExecutionMode.PAUSE,
):
"""
Public object constructor
"""
if not inspect.iscoroutinefunction(on_error) and on_error not in ExceptionHandling:
if inspect.iscoroutine(on_unknown_event):
raise TypeError(
"on_unknown_event should be a coroutine *function*, not a coroutine! Pass the function"
" object without calling it!"
)
raise TypeError(
"expected on_error to be a coroutine function or an entry from the ExceptionHandling"
f" enum, found {type(on_error).__name__!r} instead"
)
if inspect.iscoroutinefunction(on_unknown_event) and on_unknown_event not in UnknownEventHandling:
if inspect.iscoroutine(on_unknown_event):
raise TypeError(
"on_unknown_event should be a coroutine *function*, not a coroutine! Pass the function"
" object without calling it!"
)
raise TypeError(
"expected on_unknown_event to be a coroutine function or an entry from the"
f" UnknownEventHandling enum, found {type(on_unknown_event).__name__!r} instead"
)
if mode not in ExecutionMode:
raise TypeError(
f"expected mode to be an entry from the ExecutionMode enum, found {type(mode).__name__!r}" " instead"
)
self.on_error = on_error
self.on_unknown_event = on_unknown_event
self.mode = mode
# Determines the implementation of emit()
# and wait() according to the provided
# settings and the current Python version
if self.mode == ExecutionMode.PAUSE:
self._emit_impl = self._emit_await
else:
self._emit_impl = self._emit_nowait
self.logger: Logger = getLogger("asyncevents")
self.logger.handlers = []
self.logger.setLevel(INFO)
handler: StreamHandler = StreamHandler(sys.stderr)
handler.setFormatter(Formatter(fmt="[%(levelname)s - %(asctime)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S %p"))
self.logger.addHandler(handler)
# Stores asyncio tasks so that wait() can call
# asyncio.gather() on them
self._tasks: List[Tuple[str, asyncio.Task]] = []
# Stores events and their priorities. Each
# entry in the dictionary is a (name, handlers)
# tuple where name is a string and handlers is
# list of tuples. Each tuple in the list contains
# a priority (defaults to 0), the insertion time of
# when the handler was registered (to act as a tie
# breaker for _tasks with identical priorities or
# when priorities aren't used at all) a coroutine
# function object and a boolean that signals if the
# handler is to be deleted after it fires the first
# time (aka 'oneshot')
self.handlers: Dict[
str, List[Tuple[int, float, Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]], bool]]
] = defaultdict(list)
def exists(self, event: str) -> bool:
"""
Returns if the given event has at least
one handler registered
:param event: The event name
:type event: str
:return: True if the event has at least one handler,
False otherwise
"""
return len(self.handlers.get(event)) > 0
def register_event(
self,
event: str,
handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]],
priority: int = 0,
oneshot: bool = False,
):
"""
Registers an event and its associated handler. If
the event is already registered, the given handler
is added to the already existing handler queue. Each
event will be called with the AsyncEventEmitter instance
that triggered the event as well as the event name itself
:param event: The event name
:type event: str
:param handler: A coroutine function to be called
when the event is generated
:type handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]]
:param priority: The handler's execution priority,
defaults to 0 (lower priority means earlier
execution!)
:type priority: int, optional
:param oneshot: If True, the event is unregistered after if fires the first time,
defaults to False
:type oneshot: bool, optional
"""
heappush(self.handlers[event], (priority, time.monotonic(), handler, oneshot))
def unregister_event(self, event: str):
"""
Unregisters all handlers for the given
event in one go. Does nothing if the
given event is not registered already and
raise_on_missing equals False (the default).
Note that this does not affect any
already started event handler for the
given event
:param event: The event name
:type event: str
:raises:
UnknownEvent: If self.on_unknown_error == UnknownEventHandling.ERROR
"""
self.handlers.pop(event, None)
def _get(
self, event: str, handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]]
) -> None | bool | Tuple[int, float, Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]], bool]:
"""
Returns the tuple of (priority, date, corofunc, oneshot) representing the
given handler. Only the first matching entry is returned. If
raise_on_missing is False, None is returned if the given
event does not exist. False is returned if the given
handler is not registered for the given event
Note: This method is meant mostly for internal use
:param event: The event name
:type event: str
:param handler: A coroutine function to be called
when the event is generated
:type handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]]
:raises:
UnknownEvent: If self.on_unknown_error == UnknownEventHandling.ERROR
:returns: The tuple of (priority, date, coro) representing the
given handler
"""
if not self.exists(event):
return None
for (priority, tie, corofunc, oneshot) in self.handlers[event]:
if corofunc == handler:
return priority, tie, corofunc, oneshot
return False
def unregister_handler(
self,
event: str,
handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]],
remove_all: bool = False,
):
"""
Unregisters a given handler for the given event. If
remove_all is True (defaults to False), all occurrences
of the given handler are removed, otherwise only the first
one is unregistered. Does nothing if the given event is not
registered already and raise_on_missing equals False (the default).
This method does nothing if the given event exists, but the given
handler is not registered for it
:param event: The event name
:type event: str
:param handler: The coroutine function to be unregistered
:type handler: Callable[["AsyncEventEmitter", str], Coroutine[Any, Any, Any]]
:param remove_all: If True, all occurrences of the
given handler are removed, otherwise only the first
one is unregistered
:type remove_all: bool, optional
:raises:
UnknownEvent: If self.on_unknown_error == UnknownEventHandling.ERROR
:return:
"""
if remove_all:
for (priority, tie, coro, oneshot) in self.handlers[event]:
if handler == coro:
self.handlers[event].remove((priority, tie, coro, oneshot))
else:
if t := self._get(event, handler):
self.handlers[event].remove(t)
# We maintain the heap queue invariant
heapify(self.handlers[event])
async def wait(self, event: Optional[str] = None):
"""
Waits until all the event handlers for the given
event have finished executing. When the given event
is None, the default, waits for all handlers of all
events to terminate. This method is a no-op when the
emitter is configured with anything other than
ExecutionMode.NOWAIT or if emit() hasn't been called
with block=False
"""
if not event:
await asyncio.gather(*[t[1] for t in self._tasks])
self._tasks = []
else:
await self._check_event(event)
await asyncio.gather(*[t[1] for t in self._tasks if t[0] == event])
async def emit(self, event: str, block: bool = True):
"""
Emits an event
:param event: The event to trigger. Note that,
depending on the configuration, unknown events
may raise errors or log to stderr
:type event: str
:param block: Temporarily overrides the emitter's global execution
mode. If block is True, the default, this call will pause until
execution of all event handlers has finished, otherwise it returns
as soon as they're scheduled
:type block: bool, optional
:raises:
UnknownEvent: If self.on_unknown_error == UnknownEventHandling.ERROR
and the given event is not registered
"""
mode = self.mode
if block:
self.mode = ExecutionMode.PAUSE
self._emit_impl = self._emit_await
else:
self.mode = ExecutionMode.NOWAIT
self._emit_impl = self._emit_nowait
await self._emit_impl(event)
self.mode = mode

1
requirements.txt

@ -0,0 +1 @@
setuptools~=59.2.0

32
setup.py

@ -0,0 +1,32 @@
# Copyright (C) 2021 nocturn9x
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import setuptools
if __name__ == "__main__":
with open("README.md") as readme:
long_description = readme.read()
setuptools.setup(
name="asyncevents",
version="0.1",
author="Nocturn9x",
author_email="hackhab@gmail.com",
description="Asynchronous event handling in modern Python",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/nocturn9x/asyncevents",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache License 2.0",
],
python_requires=">=3.8",
)

27
tests/on_error.py

@ -0,0 +1,27 @@
import asyncio
from asyncevents import on_event, emit, get_current_emitter, ExceptionHandling
@on_event("error")
async def oh_no(_, event: str):
print("Goodbye!")
raise ValueError("D:")
async def main():
try:
await emit("error") # The error propagates
except ValueError:
print("Bang!")
# Now let's try a different error handling strategy
get_current_emitter().on_error = ExceptionHandling.LOG # Logs the exception
await emit("error") # This won't raise. Yay!
print("We're safe!")
# And a different one again
get_current_emitter().on_error = ExceptionHandling.IGNORE # Silences the exception
await emit("error") # This won't raise nor log anything to the console. Yay x2!
print("We're safe again!")
if __name__ == "__main__":
asyncio.run(main())

36
tests/simple_example.py

@ -0,0 +1,36 @@
import time
import asyncio
from asyncevents import on_event, emit, wait
@on_event("hello")
async def hello(_, event: str):
print(f"Hello {event!r}!")
@on_event("hi")
async def hi(_, event: str):
print(f"Hi {event!r}! I'm going to sleep for 5 seconds")
await asyncio.sleep(5) # Simulates some work
async def main():
print("Firing blocking event 'hello'")
await emit("hello") # This call blocks until hello() terminates
print("Handlers for event 'hello' have exited")
# Notice how, until here, the output is in order: this is on purpose!
# When using blocking mode, asyncevents even guarantees that handlers
# with different priorities will be executed in order
print("Firing blocking event 'hello'")
await emit("hi", block=False) # This one spawns hi() and returns immediately
print("Non-blocking event 'hello' fired")
await emit("event3") # Does nothing: No handlers registered for event3!
# We wait now for the the handler of the "hi" event to complete
t = time.time()
print("Waiting on event 'hi'")
await wait("hi") # Waits until all the handlers triggered by the "hi" event exit
print(f"Waited for {time.time() - t:.2f} seconds") # Should print roughly 5 seconds
if __name__ == "__main__":
asyncio.run(main())
Loading…
Cancel
Save