diff --git a/tests/chrono.py b/tests/chrono.py new file mode 100644 index 0000000..c86af5f --- /dev/null +++ b/tests/chrono.py @@ -0,0 +1,445 @@ +import time +from datetime import datetime, timedelta +from dataclasses import dataclass, field +from abc import ABC, abstractmethod +from threading import Lock, Event, Thread +from concurrent.futures import ProcessPoolExecutor +from multiprocessing import cpu_count +from timeit import default_timer + +import structio + + +def parse_delay(delta: str) -> timedelta: + """ + Parse a delay string like '1y 2mo 3w 4d 5h 6m 7s' + into a timedelta object. Case and space insensitive + """ + + second = 1 + minute = second * 60 + hour = minute * 60 + day = hour * 24 + week = day * 7 + # 30 days is just the most commonly accepted + # duration for a month, even though 4 weeks + # also is, and they don't always match up in + # duration + month = day * 30 + # No leap years or stuff like that. Sorry! + year = day * 365 + units = {"s": second, + "m": minute, + "h": hour, + "d": day, + "w": week, + "mo": month, + "y": year} + # We need this to handle multi-character units + # like months + longest = max(len(k) for k in units.keys()) + seconds = 0 + default_unit = unit = units['d'] # Defaults to days + value = "" + # Clean up the string, we don't need this junk anyway + delta = delta.translate(str.maketrans({"\n": "", "\t": "", "\r": "", " ": ""})).lower() + i = 0 + while i < len(delta): + ch = delta[i] + i += 1 + # isdigit will return True even for things like numbers + # in Arabic and stuff. For my use case this is undesirable, + # but if you care about that, just swap isnumeric() for isdigit() + if ch.isnumeric(): + value += ch + # Only restrict ourselves to ASCII letters. Again, this is specific + # to my use-case. Can edit as necessary + elif ch.isalpha() and ch.isascii(): + unit_name = ch + j = i + # Try to parse as many characters of the unit as possible, + # but stop once we either reach the end of the string or + # the length of the current (potential) unit equals the + # length of the longest one we know + while j < len(delta) and len(unit_name) < longest: + if (char := delta[j]).isalpha() and char.isascii(): + unit_name += char + else: + break + j += 1 + # If the unit was more than one character long, we + # have to skip the next len(unit_name) - 1 characters, + # so we don't try to parse 'mo' and then 'o' again, for + # example. If the unit was one character long, this value + # will just be zero + i += len(unit_name) - 1 + try: + unit = units[unit_name] + except KeyError: + raise ValueError(f"invalid unit {unit_name!r}") + if value: + seconds += unit * int(value) + value = "" + unit = default_unit + else: + # Unknown character + raise ValueError(f"Invalid character {ch!r} in time delta: {delta!r}") + if value: + seconds = unit * int(value) + + return timedelta(seconds=seconds) + + +@dataclass(order=True) +class ScheduledTask: + """ + A scheduled task + """ + + command: str = field(compare=True) + when: timedelta = field(compare=True) + last: datetime | None = field(compare=False, default=None) + + def should_run(self): + if self.last is None: + return True + return (datetime.now() - self.last) >= self.when + + +class TaskScheduler(ABC): + """ + A task scheduler. The concurrency_limit argument + limits how many tasks can be run at the same time + """ + + def __init__(self, concurrency_limit: int | None = None): + """ + Public object constructor + """ + + self._running = False + self._stop = False + self._timer = default_timer + self.concurrency_limit = concurrency_limit + + @property + def running(self) -> bool: + """ + Returns whether the event scheduler is running + """ + + return self._running + + @property + def time(self) -> float: + """ + Return the scheduler's internal wall clock + time, in seconds + """ + + return self._timer() + + @property + def stopping(self) -> bool: + """ + Returns whether the scheduler is in the process of + shutting down + """ + + return self.running and self._stop + + @property + def stopped(self) -> bool: + """ + Returns whether the scheduler has stopped + """ + + return not self.running and self._stop + + @abstractmethod + def schedule(self, command: str, when: timedelta) -> ScheduledTask: + """ + Schedules the given command to run with the chosen + interval. Duplicates are allowed. A ScheduledTask + object is returned + """ + + return NotImplemented() + + @abstractmethod + def unschedule(self, task: ScheduledTask): + """ + Unschedules the command associated to the given + ScheduledTask object. Note that this only removes + the command associate with this exact object, not + any of its duplicates (for that, use unschedule_all). + If the given task is not scheduled, this method does + nothing + """ + + return NotImplemented() + + @abstractmethod + def unschedule_all(self, task: ScheduledTask) -> int: + """ + Unschedules all commands associated to the + given ScheduledTask object. Unlike unschedule(), + this method checks for object equality rather than + identity, so it removes all instances of the given + command that match the given one (meaning it can be + instantiated 'ex-novo' without needing the specific + object returned by schedule()). Returns the number + of tasks that were unscheduled + """ + + return NotImplemented() + + @abstractmethod + def start(self): + """ + Start the event scheduler. Unless explicitly + stated elsewhere, tasks can be scheduled both + before and during the call to run(), and they + will be executed as expected. Tasks scheduled + after calling stop() will be executed at the + next call to run(). If the scheduler is already + running, RuntimeError is raised. Note that, depending + on the underlying implementation, this method call may + block until the event loop is stopped by calling stop() + (usually by another thread or task) + """ + + return NotImplemented() + + @abstractmethod + def stop(self): + """ + Stop the event scheduler. It can be restarted + by calling start() again. Note that the scheduler + will first terminate running active tasks before + stopping, but stop() does not block to reflect that + """ + + return NotImplemented() + + +class SyncTaskScheduler(TaskScheduler): + """ + A synchronous, thread-safe + task scheduler. Calling start() blocks + the calling thread until the scheduler + is stopped by calling stop() in another + thread. Commands are executed in their + own process + """ + + def __init__(self, concurrency_limit: int | None = None): + super().__init__(concurrency_limit) + self._sem = Lock() + self._evt = Event() + self._executor = ProcessPoolExecutor(cpu_count() * 2 - 1 if concurrency_limit is None else concurrency_limit) + self._tie = 0 + self._tasks: list[tuple[ScheduledTask, int]] = [] + + @staticmethod + def _run_command(task: ScheduledTask): + print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S %p')}] Running command {task.command!r}") + # TODO: os.exec(...) + + def schedule(self, command: str, when: timedelta) -> ScheduledTask: + task = ScheduledTask(command, when) + with self._sem: + self._tasks.append((task, self._tie)) + self._tie += 1 + self._tasks.sort() + # Wakeup other thread blocked in start() + self._evt.set() + self._evt.clear() + return task + + def unschedule(self, task: ScheduledTask): + with self._sem: + for t in self._tasks: + if t[0] is task: + self._tasks.remove(t) + self._tasks.sort() + break + + def unschedule_all(self, task: ScheduledTask) -> int: + result = 0 + with self._sem: + for t in self._tasks: + if t[0] == task: + self._tasks.remove(t) + result += 1 + if result > 0: + self._tasks.sort() + return result + + def start(self): + with self._sem: + if self.running: + raise RuntimeError("scheduler is already running") + self._stop = False + while True: + with self._sem: + if self._stop: + self._running = False + break + self._sem.acquire() + if not self._tasks: + # No tasks to run. To avoid busy-waiting, we + # go to sleep indefinitely on an event until + # a task is scheduled. Note how we don't hold + # the lock before calling wait, this is to avoid + # a deadlock when other threads call schedule() + self._sem.release() + self._evt.wait() + if self._sem.locked(): + self._sem.release() + tasks: list[ScheduledTask] = [] + with self._sem: + for task, _ in self._tasks: + if task.should_run(): + tasks.append(task) + task.last = datetime.now() + self._executor.map(self._run_command, tasks) + with self._sem: + deadline = self._tasks[0][0].when.total_seconds() + # Wait again until either the closest deadline is due + # or when a new task was added (this is to make sure + # that the scheduler stays responsive to schedule() + # calls made by other threads during its runtime) + self._evt.wait(timeout=deadline) + + def stop(self): + with self._sem: + self._stop = True + + +class AsyncTaskScheduler(TaskScheduler): + """ + An asynchronous task scheduler. Not thread + safe. + """ + + def __init__(self, concurrency_limit: int | None = None): + super().__init__(concurrency_limit) + if self.concurrency_limit is None: + self.concurrency_limit = 20 + self._sem = structio.Lock() + # Task concurrency limiter + self._cap = structio.Semaphore(self.concurrency_limit) + self._evt = structio.Event() + self._tie = 0 + self._tasks: list[tuple[ScheduledTask, int]] = [] + + async def _run_command(self, task: ScheduledTask): + print(f"[{datetime.now().strftime('%d/%m/%Y %H:%M:%S %p')}] Running command {task.command!r}") + # TODO: await structio.parallel.run(...) + + # Allow more commands to be executed by releasing + # a slot + await self._cap.release() + + def schedule(self, command: str, when: timedelta) -> ScheduledTask: + task = ScheduledTask(command, when) + self._tasks.append((task, self._tie)) + self._tie += 1 + self._tasks.sort() + # Wakeup other task blocked in start() + self._evt.set() + # Events are cheap, so they don't have a + # clear method. We just create a new event + self._evt = structio.Event() + return task + + def unschedule(self, task: ScheduledTask): + for t in self._tasks: + if t[0] is task: + self._tasks.remove(t) + self._tasks.sort() + break + + def unschedule_all(self, task: ScheduledTask) -> int: + result = 0 + for t in self._tasks: + if t[0] == task: + self._tasks.remove(t) + result += 1 + if result > 0: + self._tasks.sort() + return result + + async def start(self): + if self.running: + raise RuntimeError("scheduler is already running") + self._stop = False + async with structio.create_pool() as pool: + while True: + if self._stop: + self._running = False + break + if not self._tasks: + await self._evt.wait() + tasks: list[ScheduledTask] = [] + for task, _ in self._tasks: + if task.should_run(): + tasks.append(task) + task.last = datetime.now() + for task in tasks: + await self._cap.acquire() + pool.spawn(self._run_command, task) + deadline = self._tasks[0][0].when.total_seconds() + # Wait again until either the closest deadline is due + # or when a new task was added (this is to make sure + # that the scheduler stays responsive to schedule() + # calls made by other tasks during its runtime) + with structio.skip_after(deadline): + await self._evt.wait() + + def stop(self): + self._stop = True + + +def main_threaded(): + print("Using synchronous scheduler") + scheduler: TaskScheduler = SyncTaskScheduler() + scheduler.schedule("test", parse_delay("1s")) + scheduler.schedule("test 2", parse_delay("2s")) + scheduler_thread = Thread(target=scheduler.start) + scheduler_thread.start() + time.sleep(5) + print("Adding 3rd task") + scheduler.schedule("test 3", parse_delay("3s")) + time.sleep(10) + print("Removing third task") + scheduler.unschedule_all(ScheduledTask("test 3", parse_delay("3s"))) + time.sleep(5) + print("Stopping") + scheduler.stop() + scheduler_thread.join() + print("Stopped") + + +async def main_async(): + print("Using asynchronous scheduler") + scheduler: TaskScheduler = AsyncTaskScheduler() + scheduler.schedule("test", parse_delay("1s")) + scheduler.schedule("test 2", parse_delay("2s")) + async with structio.create_pool() as pool: + pool.spawn(scheduler.start) + await structio.sleep(5) + print("Adding 3rd task") + scheduler.schedule("test 3", parse_delay("3s")) + await structio.sleep(10) + print("Removing third task") + scheduler.unschedule_all(ScheduledTask("test 3", parse_delay("3s"))) + await structio.sleep(5) + print("Stopping") + scheduler.stop() + print("Stopped") + + +structio.run(main_async) +main_threaded() diff --git a/tests/httpcore_test.py b/tests/httpcore_test.py new file mode 100644 index 0000000..57e81d1 --- /dev/null +++ b/tests/httpcore_test.py @@ -0,0 +1,10 @@ +import httpcore +import structio + + +async def main(): + pool = httpcore.AsyncConnectionPool() + print(await pool.request("GET", "http://example.com")) + + +structio.run(main) diff --git a/tests/https_test.py b/tests/tls_test.py similarity index 100% rename from tests/https_test.py rename to tests/tls_test.py