import types import datetime from collections import deque, defaultdict from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE from inspect import iscoroutine from functools import wraps import socket from .exceptions import GiambioError import traceback return_values = {} # Saves the return values from coroutines exceptions = {} # Saves exceptions from errored coroutines def sync_only(func): @wraps(func) def wrapper(*args, **kwargs): if iscoroutine(2): raise RuntimeError(f"Function '{func.__name__}' MUST be called from a synchronous context!") return func(*args, **kwargs) return wrapper class Task: """A simple wrapper around a coroutine object""" def __init__(self, coroutine: types.coroutine): self.coroutine = coroutine self.status = False # Not ran yet self.joined = False def run(self): self.status = True return self.coroutine.send(None) def __repr__(self): return f"